OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "components/copresence/copresence_manager_impl.h" | 5 #include "components/copresence/copresence_manager_impl.h" |
6 | 6 |
| 7 #include <map> |
7 #include <vector> | 8 #include <vector> |
8 | 9 |
9 #include "base/bind.h" | 10 #include "base/bind.h" |
10 #include "base/strings/stringprintf.h" | 11 #include "base/strings/stringprintf.h" |
11 #include "base/time/time.h" | 12 #include "base/time/time.h" |
12 #include "base/timer/timer.h" | 13 #include "base/timer/timer.h" |
13 #include "components/copresence/copresence_state_impl.h" | 14 #include "components/copresence/copresence_state_impl.h" |
14 #include "components/copresence/handlers/directive_handler_impl.h" | 15 #include "components/copresence/handlers/directive_handler_impl.h" |
15 #include "components/copresence/handlers/gcm_handler_impl.h" | 16 #include "components/copresence/handlers/gcm_handler_impl.h" |
16 #include "components/copresence/proto/rpcs.pb.h" | 17 #include "components/copresence/proto/rpcs.pb.h" |
17 #include "components/copresence/public/whispernet_client.h" | 18 #include "components/copresence/public/whispernet_client.h" |
18 #include "components/copresence/rpc/rpc_handler.h" | 19 #include "components/copresence/rpc/rpc_handler.h" |
19 | 20 |
| 21 using google::protobuf::RepeatedPtrField; |
| 22 |
20 namespace { | 23 namespace { |
21 | 24 |
22 const int kPollTimerIntervalMs = 3000; // milliseconds. | 25 const int kPollTimerIntervalMs = 3000; // milliseconds. |
23 const int kAudioCheckIntervalMs = 1000; // milliseconds. | 26 const int kAudioCheckIntervalMs = 1000; // milliseconds. |
24 | 27 |
| 28 const int kQueuedMessageTimeout = 10; // seconds. |
| 29 const int kMaxQueuedMessages = 1000; |
| 30 |
25 } // namespace | 31 } // namespace |
26 | 32 |
27 namespace copresence { | 33 namespace copresence { |
28 | 34 |
| 35 bool SupportedTokenMedium(const TokenObservation& token) { |
| 36 for (const TokenSignals& signals : token.signals()) { |
| 37 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || |
| 38 signals.medium() == AUDIO_AUDIBLE_DTMF) |
| 39 return true; |
| 40 } |
| 41 return false; |
| 42 } |
| 43 |
| 44 |
29 // Public functions. | 45 // Public functions. |
30 | 46 |
31 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) | 47 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
32 : delegate_(delegate), | 48 : delegate_(delegate), |
33 whispernet_init_callback_(base::Bind( | 49 whispernet_init_callback_(base::Bind( |
34 &CopresenceManagerImpl::WhispernetInitComplete, | 50 &CopresenceManagerImpl::WhispernetInitComplete, |
35 // This callback gets cancelled when we are destroyed. | 51 // This callback gets cancelled when we are destroyed. |
36 base::Unretained(this))), | 52 base::Unretained(this))), |
37 init_failed_(false), | 53 init_failed_(false), |
38 state_(new CopresenceStateImpl), | 54 state_(new CopresenceStateImpl), |
39 directive_handler_(new DirectiveHandlerImpl( | 55 directive_handler_(new DirectiveHandlerImpl( |
40 // The directive handler and its descendants | 56 // The directive handler and its descendants |
41 // will be destructed before the CopresenceState instance. | 57 // will be destructed before the CopresenceState instance. |
42 base::Bind(&CopresenceStateImpl::UpdateDirectives, | 58 base::Bind(&CopresenceStateImpl::UpdateDirectives, |
43 base::Unretained(state_.get())))), | 59 base::Unretained(state_.get())))), |
44 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), | 60 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), |
45 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) { | 61 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), |
| 62 queued_messages_by_token_( |
| 63 base::TimeDelta::FromSeconds(kQueuedMessageTimeout), |
| 64 kMaxQueuedMessages) { |
46 DCHECK(delegate_); | 65 DCHECK(delegate_); |
47 DCHECK(delegate_->GetWhispernetClient()); | 66 DCHECK(delegate_->GetWhispernetClient()); |
48 delegate_->GetWhispernetClient()->Initialize( | 67 delegate_->GetWhispernetClient()->Initialize( |
49 whispernet_init_callback_.callback()); | 68 whispernet_init_callback_.callback()); |
50 | 69 |
| 70 MessagesCallback messages_callback = base::Bind( |
| 71 &CopresenceManagerImpl::DispatchMessages, |
| 72 // This will only be passed to objects that we own. |
| 73 base::Unretained(this)); |
| 74 |
51 if (delegate->GetGCMDriver()) | 75 if (delegate->GetGCMDriver()) |
52 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), | 76 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), |
53 directive_handler_.get())); | 77 directive_handler_.get(), |
| 78 messages_callback)); |
54 | 79 |
55 rpc_handler_.reset(new RpcHandler(delegate, | 80 rpc_handler_.reset(new RpcHandler(delegate, |
56 state_.get(), | 81 state_.get(), |
57 directive_handler_.get(), | 82 directive_handler_.get(), |
58 gcm_handler_.get())); | 83 gcm_handler_.get(), |
| 84 messages_callback)); |
59 } | 85 } |
60 | 86 |
61 CopresenceManagerImpl::~CopresenceManagerImpl() { | 87 CopresenceManagerImpl::~CopresenceManagerImpl() { |
62 whispernet_init_callback_.Cancel(); | 88 whispernet_init_callback_.Cancel(); |
63 } | 89 } |
64 | 90 |
65 CopresenceState* CopresenceManagerImpl::state() { | 91 CopresenceState* CopresenceManagerImpl::state() { |
66 return state_.get(); | 92 return state_.get(); |
67 } | 93 } |
68 | 94 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
106 } else { | 132 } else { |
107 LOG(ERROR) << "Whispernet initialization failed!"; | 133 LOG(ERROR) << "Whispernet initialization failed!"; |
108 init_failed_ = true; | 134 init_failed_ = true; |
109 } | 135 } |
110 } | 136 } |
111 | 137 |
112 void CopresenceManagerImpl::ReceivedTokens( | 138 void CopresenceManagerImpl::ReceivedTokens( |
113 const std::vector<AudioToken>& tokens) { | 139 const std::vector<AudioToken>& tokens) { |
114 rpc_handler_->ReportTokens(tokens); | 140 rpc_handler_->ReportTokens(tokens); |
115 | 141 |
116 // Update the CopresenceState. | |
117 for (const AudioToken audio_token : tokens) { | 142 for (const AudioToken audio_token : tokens) { |
118 DVLOG(3) << "Heard token: " << audio_token.token; | 143 const std::string& token_id = audio_token.token; |
| 144 DVLOG(3) << "Heard token: " << token_id; |
| 145 |
| 146 // Update the CopresenceState. |
119 ReceivedToken token( | 147 ReceivedToken token( |
120 audio_token.token, | 148 token_id, |
121 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, | 149 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, |
122 base::Time::Now()); | 150 base::Time::Now()); |
123 state_->UpdateReceivedToken(token); | 151 state_->UpdateReceivedToken(token); |
| 152 |
| 153 // Deliver messages that were pre-sent on this token. |
| 154 if (queued_messages_by_token_.HasKey(token_id)) { |
| 155 // Not const because we have to remove the required tokens for delivery. |
| 156 // We're going to delete this whole vector at the end anyway. |
| 157 RepeatedPtrField<SubscribedMessage>* messages = |
| 158 queued_messages_by_token_.GetMutableValue(token_id); |
| 159 DCHECK_GT(messages->size(), 0) |
| 160 << "Empty entry in queued_messages_by_token_"; |
| 161 |
| 162 // These messages still have their required tokens stored, and |
| 163 // DispatchMessages() will still check for them. If we don't remove |
| 164 // the tokens before delivery, we'll just end up re-queuing the message. |
| 165 for (SubscribedMessage& message : *messages) |
| 166 message.mutable_required_token()->Clear(); |
| 167 |
| 168 DVLOG(3) << "Delivering " << messages->size() |
| 169 << " message(s) pre-sent on token " << token_id; |
| 170 DispatchMessages(*messages); |
| 171 |
| 172 // The messages have been delivered, so we don't need to keep them |
| 173 // in the queue. Note that the token will still be reported |
| 174 // to the server (above), so we'll keep getting the message. |
| 175 // But we can now drop our local copy of it. |
| 176 int erase_count = queued_messages_by_token_.Erase(token_id); |
| 177 DCHECK_GT(erase_count, 0); |
| 178 } |
124 } | 179 } |
125 } | 180 } |
126 | 181 |
| 182 void CopresenceManagerImpl::AudioCheck() { |
| 183 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && |
| 184 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { |
| 185 delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| 186 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && |
| 187 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { |
| 188 delegate_->HandleStatusUpdate(AUDIO_FAIL); |
| 189 } |
| 190 } |
| 191 |
| 192 // Report our currently playing tokens to the server. |
127 void CopresenceManagerImpl::PollForMessages() { | 193 void CopresenceManagerImpl::PollForMessages() { |
128 // Report our currently playing tokens. | |
129 const std::string& audible_token = | 194 const std::string& audible_token = |
130 directive_handler_->GetCurrentAudioToken(AUDIBLE); | 195 directive_handler_->GetCurrentAudioToken(AUDIBLE); |
131 const std::string& inaudible_token = | 196 const std::string& inaudible_token = |
132 directive_handler_->GetCurrentAudioToken(INAUDIBLE); | 197 directive_handler_->GetCurrentAudioToken(INAUDIBLE); |
133 | 198 |
134 std::vector<AudioToken> tokens; | 199 std::vector<AudioToken> tokens; |
135 if (!audible_token.empty()) | 200 if (!audible_token.empty()) |
136 tokens.push_back(AudioToken(audible_token, true)); | 201 tokens.push_back(AudioToken(audible_token, true)); |
137 if (!inaudible_token.empty()) | 202 if (!inaudible_token.empty()) |
138 tokens.push_back(AudioToken(inaudible_token, false)); | 203 tokens.push_back(AudioToken(inaudible_token, false)); |
139 | 204 |
140 if (!tokens.empty()) | 205 if (!tokens.empty()) |
141 rpc_handler_->ReportTokens(tokens); | 206 rpc_handler_->ReportTokens(tokens); |
142 } | 207 } |
143 | 208 |
144 void CopresenceManagerImpl::AudioCheck() { | 209 void CopresenceManagerImpl::DispatchMessages( |
145 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && | 210 const RepeatedPtrField<SubscribedMessage>& messages) { |
146 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { | 211 if (messages.size() == 0) |
147 delegate_->HandleStatusUpdate(AUDIO_FAIL); | 212 return; |
148 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && | 213 |
149 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { | 214 // Index the messages by subscription id. |
150 delegate_->HandleStatusUpdate(AUDIO_FAIL); | 215 std::map<std::string, std::vector<Message>> messages_by_subscription; |
| 216 DVLOG(3) << "Processing " << messages.size() << " received message(s)."; |
| 217 int immediate_message_count = 0; |
| 218 for (const SubscribedMessage& message : messages) { |
| 219 // If tokens are required for this message, queue it. |
| 220 // Otherwise stage it for delivery. |
| 221 if (message.required_token_size() > 0) { |
| 222 int supported_token_count = 0; |
| 223 for (const TokenObservation& token : message.required_token()) { |
| 224 if (SupportedTokenMedium(token)) { |
| 225 if (!queued_messages_by_token_.HasKey(token.token_id())) { |
| 226 queued_messages_by_token_.Add( |
| 227 token.token_id(), RepeatedPtrField<SubscribedMessage>()); |
| 228 } |
| 229 RepeatedPtrField<SubscribedMessage>* queued_messages = |
| 230 queued_messages_by_token_.GetMutableValue(token.token_id()); |
| 231 DCHECK(queued_messages); |
| 232 queued_messages->Add()->CopyFrom(message); |
| 233 supported_token_count++; |
| 234 } |
| 235 } |
| 236 |
| 237 if (supported_token_count > 0) { |
| 238 DVLOG(3) << "Queued message under " << supported_token_count |
| 239 << "token(s)."; |
| 240 } else { |
| 241 VLOG(2) << "Discarded message that requires one of " |
| 242 << message.required_token_size() |
| 243 << " token(s), all on unsupported mediums."; |
| 244 } |
| 245 } else { |
| 246 immediate_message_count++; |
| 247 for (const std::string& subscription_id : message.subscription_id()) { |
| 248 messages_by_subscription[subscription_id].push_back( |
| 249 message.published_message()); |
| 250 } |
| 251 } |
| 252 } |
| 253 |
| 254 // Send the messages for each subscription. |
| 255 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " |
| 256 << messages_by_subscription.size() << " subscription(s)."; |
| 257 for (const auto& map_entry : messages_by_subscription) { |
| 258 // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
| 259 // it in here and get rid of the app id registry from the main API class. |
| 260 const std::string& subscription = map_entry.first; |
| 261 const std::vector<Message>& messages = map_entry.second; |
| 262 delegate_->HandleMessages(std::string(), subscription, messages); |
151 } | 263 } |
152 } | 264 } |
153 | 265 |
154 } // namespace copresence | 266 } // namespace copresence |
OLD | NEW |