OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "components/copresence/copresence_manager_impl.h" | 5 #include "components/copresence/copresence_manager_impl.h" |
6 | 6 |
| 7 #include <map> |
7 #include <vector> | 8 #include <vector> |
8 | 9 |
9 #include "base/bind.h" | 10 #include "base/bind.h" |
10 #include "base/strings/stringprintf.h" | 11 #include "base/strings/stringprintf.h" |
11 #include "base/time/time.h" | 12 #include "base/time/time.h" |
12 #include "base/timer/timer.h" | 13 #include "base/timer/timer.h" |
13 #include "components/copresence/copresence_state_impl.h" | 14 #include "components/copresence/copresence_state_impl.h" |
14 #include "components/copresence/handlers/directive_handler_impl.h" | 15 #include "components/copresence/handlers/directive_handler_impl.h" |
15 #include "components/copresence/handlers/gcm_handler_impl.h" | 16 #include "components/copresence/handlers/gcm_handler_impl.h" |
16 #include "components/copresence/proto/rpcs.pb.h" | 17 #include "components/copresence/proto/rpcs.pb.h" |
17 #include "components/copresence/public/whispernet_client.h" | 18 #include "components/copresence/public/whispernet_client.h" |
18 #include "components/copresence/rpc/rpc_handler.h" | 19 #include "components/copresence/rpc/rpc_handler.h" |
19 | 20 |
20 namespace { | 21 namespace { |
21 | 22 |
22 const int kPollTimerIntervalMs = 3000; // milliseconds. | 23 const int kPollTimerIntervalMs = 3000; // milliseconds. |
23 const int kAudioCheckIntervalMs = 1000; // milliseconds. | 24 const int kAudioCheckIntervalMs = 1000; // milliseconds. |
24 | 25 |
| 26 const int kQueuedMessageTimeout = 10; // seconds. |
| 27 const int kMaxQueuedMessages = 1000; |
| 28 |
25 } // namespace | 29 } // namespace |
26 | 30 |
27 namespace copresence { | 31 namespace copresence { |
28 | 32 |
| 33 bool SupportedTokenMedium(const TokenObservation& token) { |
| 34 for (const TokenSignals& signals : token.signals()) { |
| 35 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || |
| 36 signals.medium() == AUDIO_AUDIBLE_DTMF) |
| 37 return true; |
| 38 } |
| 39 return false; |
| 40 } |
| 41 |
| 42 |
29 // Public functions. | 43 // Public functions. |
30 | 44 |
31 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) | 45 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
32 : delegate_(delegate), | 46 : delegate_(delegate), |
33 whispernet_init_callback_(base::Bind( | 47 whispernet_init_callback_(base::Bind( |
34 &CopresenceManagerImpl::WhispernetInitComplete, | 48 &CopresenceManagerImpl::WhispernetInitComplete, |
35 // This callback gets cancelled when we are destroyed. | 49 // This callback gets cancelled when we are destroyed. |
36 base::Unretained(this))), | 50 base::Unretained(this))), |
37 init_failed_(false), | 51 init_failed_(false), |
38 state_(new CopresenceStateImpl), | 52 state_(new CopresenceStateImpl), |
39 directive_handler_(new DirectiveHandlerImpl( | 53 directive_handler_(new DirectiveHandlerImpl( |
40 // The directive handler and its descendants | 54 // The directive handler and its descendants |
41 // will be destructed before the CopresenceState instance. | 55 // will be destructed before the CopresenceState instance. |
42 base::Bind(&CopresenceStateImpl::UpdateDirectives, | 56 base::Bind(&CopresenceStateImpl::UpdateDirectives, |
43 base::Unretained(state_.get())))), | 57 base::Unretained(state_.get())))), |
44 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), | 58 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), |
45 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) { | 59 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), |
| 60 queued_messages_by_token_( |
| 61 base::TimeDelta::FromSeconds(kQueuedMessageTimeout), |
| 62 kMaxQueuedMessages) { |
46 DCHECK(delegate_); | 63 DCHECK(delegate_); |
47 DCHECK(delegate_->GetWhispernetClient()); | 64 DCHECK(delegate_->GetWhispernetClient()); |
48 delegate_->GetWhispernetClient()->Initialize( | 65 delegate_->GetWhispernetClient()->Initialize( |
49 whispernet_init_callback_.callback()); | 66 whispernet_init_callback_.callback()); |
50 | 67 |
| 68 MessagesCallback messages_callback = base::Bind( |
| 69 &CopresenceManagerImpl::DispatchMessages< |
| 70 google::protobuf::RepeatedPtrField<SubscribedMessage>>, |
| 71 // This will only be passed to objects that we own. |
| 72 base::Unretained(this)); |
| 73 |
51 if (delegate->GetGCMDriver()) | 74 if (delegate->GetGCMDriver()) |
52 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), | 75 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), |
53 directive_handler_.get())); | 76 directive_handler_.get(), |
| 77 messages_callback)); |
54 | 78 |
55 rpc_handler_.reset(new RpcHandler(delegate, | 79 rpc_handler_.reset(new RpcHandler(delegate, |
56 state_.get(), | 80 state_.get(), |
57 directive_handler_.get(), | 81 directive_handler_.get(), |
58 gcm_handler_.get())); | 82 gcm_handler_.get(), |
| 83 messages_callback)); |
59 } | 84 } |
60 | 85 |
61 CopresenceManagerImpl::~CopresenceManagerImpl() { | 86 CopresenceManagerImpl::~CopresenceManagerImpl() { |
62 whispernet_init_callback_.Cancel(); | 87 whispernet_init_callback_.Cancel(); |
63 } | 88 } |
64 | 89 |
65 CopresenceState* CopresenceManagerImpl::state() { | 90 CopresenceState* CopresenceManagerImpl::state() { |
66 return state_.get(); | 91 return state_.get(); |
67 } | 92 } |
68 | 93 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
106 } else { | 131 } else { |
107 LOG(ERROR) << "Whispernet initialization failed!"; | 132 LOG(ERROR) << "Whispernet initialization failed!"; |
108 init_failed_ = true; | 133 init_failed_ = true; |
109 } | 134 } |
110 } | 135 } |
111 | 136 |
112 void CopresenceManagerImpl::ReceivedTokens( | 137 void CopresenceManagerImpl::ReceivedTokens( |
113 const std::vector<AudioToken>& tokens) { | 138 const std::vector<AudioToken>& tokens) { |
114 rpc_handler_->ReportTokens(tokens); | 139 rpc_handler_->ReportTokens(tokens); |
115 | 140 |
116 // Update the CopresenceState. | |
117 for (const AudioToken audio_token : tokens) { | 141 for (const AudioToken audio_token : tokens) { |
118 DVLOG(3) << "Heard token: " << audio_token.token; | 142 const std::string& token_id = audio_token.token; |
| 143 DVLOG(3) << "Heard token: " << token_id; |
| 144 |
| 145 // Update the CopresenceState. |
119 ReceivedToken token( | 146 ReceivedToken token( |
120 audio_token.token, | 147 token_id, |
121 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, | 148 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, |
122 base::Time::Now()); | 149 base::Time::Now()); |
123 state_->UpdateReceivedToken(token); | 150 state_->UpdateReceivedToken(token); |
| 151 |
| 152 // Deliver messages that were pre-sent on this token. |
| 153 if (queued_messages_by_token_.HasKey(token_id)) { |
| 154 // Not const because we have to remove the required tokens for delivery. |
| 155 // We're going to delete this whole vector at the end anyway. |
| 156 std::vector<SubscribedMessage>* messages = |
| 157 queued_messages_by_token_.GetMutableValue(token_id); |
| 158 DCHECK(!messages->empty()) << "Empty entry in queued_messages_by_token_"; |
| 159 |
| 160 // These messages still have their required tokens stored, and |
| 161 // DispatchMessages() will still check for them. If we don't remove |
| 162 // the tokens before delivery, we'll just end up re-queuing the message. |
| 163 for (SubscribedMessage& message : *messages) |
| 164 message.mutable_required_token()->Clear(); |
| 165 |
| 166 DVLOG(3) << "Delivering " << messages->size() |
| 167 << " message(s) pre-sent on token " << token_id; |
| 168 DispatchMessages(*messages); |
| 169 |
| 170 // The messages have been delivered, so we don't need to keep them |
| 171 // in the queue. Note that the token will still be reported |
| 172 // to the server (above), so we'll keep getting the message. |
| 173 // But we can now drop our local copy of it. |
| 174 int erase_count = queued_messages_by_token_.Erase(token_id); |
| 175 DCHECK_GT(erase_count, 0); |
| 176 } |
124 } | 177 } |
125 } | 178 } |
126 | 179 |
| 180 void CopresenceManagerImpl::AudioCheck() { |
| 181 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && |
| 182 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { |
| 183 delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| 184 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && |
| 185 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { |
| 186 delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| 187 } |
| 188 } |
| 189 |
| 190 // Report our currently playing tokens to the server. |
127 void CopresenceManagerImpl::PollForMessages() { | 191 void CopresenceManagerImpl::PollForMessages() { |
128 // Report our currently playing tokens. | |
129 const std::string& audible_token = | 192 const std::string& audible_token = |
130 directive_handler_->GetCurrentAudioToken(AUDIBLE); | 193 directive_handler_->GetCurrentAudioToken(AUDIBLE); |
131 const std::string& inaudible_token = | 194 const std::string& inaudible_token = |
132 directive_handler_->GetCurrentAudioToken(INAUDIBLE); | 195 directive_handler_->GetCurrentAudioToken(INAUDIBLE); |
133 | 196 |
134 std::vector<AudioToken> tokens; | 197 std::vector<AudioToken> tokens; |
135 if (!audible_token.empty()) | 198 if (!audible_token.empty()) |
136 tokens.push_back(AudioToken(audible_token, true)); | 199 tokens.push_back(AudioToken(audible_token, true)); |
137 if (!inaudible_token.empty()) | 200 if (!inaudible_token.empty()) |
138 tokens.push_back(AudioToken(inaudible_token, false)); | 201 tokens.push_back(AudioToken(inaudible_token, false)); |
139 | 202 |
140 if (!tokens.empty()) | 203 if (!tokens.empty()) |
141 rpc_handler_->ReportTokens(tokens); | 204 rpc_handler_->ReportTokens(tokens); |
142 } | 205 } |
143 | 206 |
144 void CopresenceManagerImpl::AudioCheck() { | 207 template<typename IterableSubscribedMessages> |
145 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && | 208 void CopresenceManagerImpl::DispatchMessages( |
146 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { | 209 const IterableSubscribedMessages& messages) { |
147 delegate_->HandleStatusUpdate(AUDIO_FAIL); | 210 if (messages.size() == 0) |
148 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && | 211 return; |
149 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { | 212 |
150 delegate_->HandleStatusUpdate(AUDIO_FAIL); | 213 // Index the messages by subscription id. |
| 214 std::map<std::string, std::vector<Message>> messages_by_subscription; |
| 215 DVLOG(3) << "Processing " << messages.size() << " received message(s)."; |
| 216 int immediate_message_count = 0; |
| 217 for (const SubscribedMessage& message : messages) { |
| 218 // If tokens are required for this message, queue it. |
| 219 // Otherwise stage it for delivery. |
| 220 if (message.required_token_size() > 0) { |
| 221 int supported_token_count = 0; |
| 222 for (const TokenObservation& token : message.required_token()) { |
| 223 if (SupportedTokenMedium(token)) { |
| 224 queued_messages_by_token_.GetMutableValue(token.token_id()) |
| 225 ->push_back(message); |
| 226 supported_token_count++; |
| 227 } |
| 228 } |
| 229 |
| 230 if (supported_token_count > 0) { |
| 231 DVLOG(3) << "Queued message under " << supported_token_count |
| 232 << "token(s)."; |
| 233 } else { |
| 234 VLOG(2) << "Discarded message that requires one of " |
| 235 << message.required_token_size() |
| 236 << " token(s), all on unsupported mediums."; |
| 237 } |
| 238 } else { |
| 239 immediate_message_count++; |
| 240 for (const std::string& subscription_id : message.subscription_id()) { |
| 241 messages_by_subscription[subscription_id].push_back( |
| 242 message.published_message()); |
| 243 } |
| 244 } |
| 245 } |
| 246 |
| 247 // Send the messages for each subscription. |
| 248 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " |
| 249 << messages_by_subscription.size() << " subscription(s)."; |
| 250 for (const auto& map_entry : messages_by_subscription) { |
| 251 // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
| 252 // it in here and get rid of the app id registry from the main API class. |
| 253 const std::string& subscription = map_entry.first; |
| 254 const std::vector<Message>& messages = map_entry.second; |
| 255 delegate_->HandleMessages(std::string(), subscription, messages); |
151 } | 256 } |
152 } | 257 } |
153 | 258 |
154 } // namespace copresence | 259 } // namespace copresence |
OLD | NEW |