| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "components/copresence/copresence_manager_impl.h" | |
| 6 | |
| 7 #include <map> | |
| 8 #include <utility> | |
| 9 #include <vector> | |
| 10 | |
| 11 #include "base/bind.h" | |
| 12 #include "base/strings/stringprintf.h" | |
| 13 #include "base/time/time.h" | |
| 14 #include "base/timer/timer.h" | |
| 15 #include "components/audio_modem/public/whispernet_client.h" | |
| 16 #include "components/copresence/copresence_state_impl.h" | |
| 17 #include "components/copresence/handlers/directive_handler_impl.h" | |
| 18 #include "components/copresence/handlers/gcm_handler_impl.h" | |
| 19 #include "components/copresence/proto/rpcs.pb.h" | |
| 20 #include "components/copresence/rpc/rpc_handler.h" | |
| 21 | |
| 22 using google::protobuf::RepeatedPtrField; | |
| 23 | |
| 24 using audio_modem::AUDIBLE; | |
| 25 using audio_modem::AudioToken; | |
| 26 using audio_modem::INAUDIBLE; | |
| 27 | |
| 28 namespace { | |
| 29 | |
| 30 const int kPollTimerIntervalMs = 3000; // milliseconds. | |
| 31 const int kAudioCheckIntervalMs = 1000; // milliseconds. | |
| 32 | |
| 33 const int kQueuedMessageTimeout = 10; // seconds. | |
| 34 const int kMaxQueuedMessages = 1000; | |
| 35 | |
| 36 } // namespace | |
| 37 | |
| 38 namespace copresence { | |
| 39 | |
| 40 bool SupportedTokenMedium(const TokenObservation& token) { | |
| 41 for (const TokenSignals& signals : token.signals()) { | |
| 42 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || | |
| 43 signals.medium() == AUDIO_AUDIBLE_DTMF) | |
| 44 return true; | |
| 45 } | |
| 46 return false; | |
| 47 } | |
| 48 | |
| 49 | |
| 50 // Public functions. | |
| 51 | |
| 52 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) | |
| 53 : delegate_(delegate), | |
| 54 whispernet_init_callback_( | |
| 55 base::Bind(&CopresenceManagerImpl::WhispernetInitComplete, | |
| 56 // This callback gets cancelled when we are destroyed. | |
| 57 base::Unretained(this))), | |
| 58 init_failed_(false), | |
| 59 state_(new CopresenceStateImpl), | |
| 60 directive_handler_(new DirectiveHandlerImpl( | |
| 61 // The directive handler and its descendants | |
| 62 // will be destructed before the CopresenceState instance. | |
| 63 base::Bind(&CopresenceStateImpl::UpdateDirectives, | |
| 64 base::Unretained(state_.get())))), | |
| 65 poll_timer_(new base::RepeatingTimer), | |
| 66 audio_check_timer_(new base::RepeatingTimer), | |
| 67 queued_messages_by_token_( | |
| 68 base::TimeDelta::FromSeconds(kQueuedMessageTimeout), | |
| 69 kMaxQueuedMessages) { | |
| 70 DCHECK(delegate_); | |
| 71 DCHECK(delegate_->GetWhispernetClient()); | |
| 72 // TODO(ckehoe): Handle whispernet initialization in the whispernet component. | |
| 73 delegate_->GetWhispernetClient()->Initialize( | |
| 74 whispernet_init_callback_.callback()); | |
| 75 | |
| 76 MessagesCallback messages_callback = base::Bind( | |
| 77 &CopresenceManagerImpl::DispatchMessages, | |
| 78 // This will only be passed to objects that we own. | |
| 79 base::Unretained(this)); | |
| 80 | |
| 81 if (delegate->GetGCMDriver()) | |
| 82 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), | |
| 83 directive_handler_.get(), | |
| 84 messages_callback)); | |
| 85 | |
| 86 rpc_handler_.reset(new RpcHandler(delegate, | |
| 87 directive_handler_.get(), | |
| 88 state_.get(), | |
| 89 gcm_handler_.get(), | |
| 90 messages_callback)); | |
| 91 | |
| 92 directive_handler_->Start(delegate_->GetWhispernetClient(), | |
| 93 base::Bind(&CopresenceManagerImpl::ReceivedTokens, | |
| 94 base::Unretained(this))); | |
| 95 } | |
| 96 | |
| 97 CopresenceManagerImpl::~CopresenceManagerImpl() { | |
| 98 whispernet_init_callback_.Cancel(); | |
| 99 } | |
| 100 | |
| 101 CopresenceState* CopresenceManagerImpl::state() { | |
| 102 return state_.get(); | |
| 103 } | |
| 104 | |
| 105 // Returns false if any operations were malformed. | |
| 106 void CopresenceManagerImpl::ExecuteReportRequest( | |
| 107 const ReportRequest& request, | |
| 108 const std::string& app_id, | |
| 109 const std::string& auth_token, | |
| 110 const StatusCallback& callback) { | |
| 111 // If initialization has failed, reject all requests. | |
| 112 if (init_failed_) { | |
| 113 callback.Run(FAIL); | |
| 114 return; | |
| 115 } | |
| 116 | |
| 117 // We'll need to modify the ReportRequest, so we make our own copy to send. | |
| 118 std::unique_ptr<ReportRequest> request_copy(new ReportRequest(request)); | |
| 119 rpc_handler_->SendReportRequest(std::move(request_copy), app_id, auth_token, | |
| 120 callback); | |
| 121 } | |
| 122 | |
| 123 | |
| 124 // Private functions. | |
| 125 | |
| 126 void CopresenceManagerImpl::WhispernetInitComplete(bool success) { | |
| 127 if (success) { | |
| 128 DVLOG(3) << "Whispernet initialized successfully."; | |
| 129 poll_timer_->Start(FROM_HERE, | |
| 130 base::TimeDelta::FromMilliseconds(kPollTimerIntervalMs), | |
| 131 base::Bind(&CopresenceManagerImpl::PollForMessages, | |
| 132 base::Unretained(this))); | |
| 133 audio_check_timer_->Start( | |
| 134 FROM_HERE, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs), | |
| 135 base::Bind(&CopresenceManagerImpl::AudioCheck, base::Unretained(this))); | |
| 136 } else { | |
| 137 LOG(ERROR) << "Whispernet initialization failed!"; | |
| 138 init_failed_ = true; | |
| 139 } | |
| 140 } | |
| 141 | |
| 142 void CopresenceManagerImpl::ReceivedTokens( | |
| 143 const std::vector<AudioToken>& tokens) { | |
| 144 rpc_handler_->ReportTokens(tokens); | |
| 145 | |
| 146 for (const AudioToken audio_token : tokens) { | |
| 147 const std::string& token_id = audio_token.token; | |
| 148 DVLOG(3) << "Heard token: " << token_id; | |
| 149 | |
| 150 // Update the CopresenceState. | |
| 151 ReceivedToken token( | |
| 152 token_id, | |
| 153 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, | |
| 154 base::Time::Now()); | |
| 155 state_->UpdateReceivedToken(token); | |
| 156 | |
| 157 // Deliver messages that were pre-sent on this token. | |
| 158 if (queued_messages_by_token_.HasKey(token_id)) { | |
| 159 // Not const because we have to remove the required tokens for delivery. | |
| 160 // We're going to delete this whole vector at the end anyway. | |
| 161 RepeatedPtrField<SubscribedMessage>* messages = | |
| 162 queued_messages_by_token_.GetMutableValue(token_id); | |
| 163 DCHECK_GT(messages->size(), 0) | |
| 164 << "Empty entry in queued_messages_by_token_"; | |
| 165 | |
| 166 // These messages still have their required tokens stored, and | |
| 167 // DispatchMessages() will still check for them. If we don't remove | |
| 168 // the tokens before delivery, we'll just end up re-queuing the message. | |
| 169 for (SubscribedMessage& message : *messages) | |
| 170 message.mutable_required_token()->Clear(); | |
| 171 | |
| 172 DVLOG(3) << "Delivering " << messages->size() | |
| 173 << " message(s) pre-sent on token " << token_id; | |
| 174 DispatchMessages(*messages); | |
| 175 | |
| 176 // The messages have been delivered, so we don't need to keep them | |
| 177 // in the queue. Note that the token will still be reported | |
| 178 // to the server (above), so we'll keep getting the message. | |
| 179 // But we can now drop our local copy of it. | |
| 180 int erase_count = queued_messages_by_token_.Erase(token_id); | |
| 181 DCHECK_GT(erase_count, 0); | |
| 182 } | |
| 183 } | |
| 184 } | |
| 185 | |
| 186 void CopresenceManagerImpl::AudioCheck() { | |
| 187 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && | |
| 188 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { | |
| 189 delegate_->HandleStatusUpdate(AUDIO_FAIL); | |
| 190 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && | |
| 191 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { | |
| 192 delegate_->HandleStatusUpdate(AUDIO_FAIL); | |
| 193 } | |
| 194 } | |
| 195 | |
| 196 // Report our currently playing tokens to the server. | |
| 197 void CopresenceManagerImpl::PollForMessages() { | |
| 198 const std::string& audible_token = | |
| 199 directive_handler_->GetCurrentAudioToken(AUDIBLE); | |
| 200 const std::string& inaudible_token = | |
| 201 directive_handler_->GetCurrentAudioToken(INAUDIBLE); | |
| 202 | |
| 203 std::vector<AudioToken> tokens; | |
| 204 if (!audible_token.empty()) | |
| 205 tokens.push_back(AudioToken(audible_token, true)); | |
| 206 if (!inaudible_token.empty()) | |
| 207 tokens.push_back(AudioToken(inaudible_token, false)); | |
| 208 | |
| 209 if (!tokens.empty()) | |
| 210 rpc_handler_->ReportTokens(tokens); | |
| 211 } | |
| 212 | |
| 213 void CopresenceManagerImpl::DispatchMessages( | |
| 214 const RepeatedPtrField<SubscribedMessage>& messages) { | |
| 215 if (messages.size() == 0) | |
| 216 return; | |
| 217 | |
| 218 // Index the messages by subscription id. | |
| 219 std::map<std::string, std::vector<Message>> messages_by_subscription; | |
| 220 DVLOG(3) << "Processing " << messages.size() << " received message(s)."; | |
| 221 int immediate_message_count = 0; | |
| 222 for (const SubscribedMessage& message : messages) { | |
| 223 // If tokens are required for this message, queue it. | |
| 224 // Otherwise stage it for delivery. | |
| 225 if (message.required_token_size() > 0) { | |
| 226 int supported_token_count = 0; | |
| 227 for (const TokenObservation& token : message.required_token()) { | |
| 228 if (SupportedTokenMedium(token)) { | |
| 229 if (!queued_messages_by_token_.HasKey(token.token_id())) { | |
| 230 queued_messages_by_token_.Add( | |
| 231 token.token_id(), RepeatedPtrField<SubscribedMessage>()); | |
| 232 } | |
| 233 RepeatedPtrField<SubscribedMessage>* queued_messages = | |
| 234 queued_messages_by_token_.GetMutableValue(token.token_id()); | |
| 235 DCHECK(queued_messages); | |
| 236 queued_messages->Add()->CopyFrom(message); | |
| 237 supported_token_count++; | |
| 238 } | |
| 239 } | |
| 240 | |
| 241 if (supported_token_count > 0) { | |
| 242 DVLOG(3) << "Queued message under " << supported_token_count | |
| 243 << "token(s)."; | |
| 244 } else { | |
| 245 VLOG(2) << "Discarded message that requires one of " | |
| 246 << message.required_token_size() | |
| 247 << " token(s), all on unsupported mediums."; | |
| 248 } | |
| 249 } else { | |
| 250 immediate_message_count++; | |
| 251 for (const std::string& subscription_id : message.subscription_id()) { | |
| 252 messages_by_subscription[subscription_id].push_back( | |
| 253 message.published_message()); | |
| 254 } | |
| 255 } | |
| 256 } | |
| 257 | |
| 258 // Send the messages for each subscription. | |
| 259 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " | |
| 260 << messages_by_subscription.size() << " subscription(s)."; | |
| 261 for (const auto& map_entry : messages_by_subscription) { | |
| 262 // TODO(ckehoe): Once we have the app ID from the server, we need to pass | |
| 263 // it in here and get rid of the app id registry from the main API class. | |
| 264 const std::string& subscription = map_entry.first; | |
| 265 const std::vector<Message>& messages = map_entry.second; | |
| 266 delegate_->HandleMessages(std::string(), subscription, messages); | |
| 267 } | |
| 268 } | |
| 269 | |
| 270 } // namespace copresence | |
| OLD | NEW |