OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "components/copresence/copresence_manager_impl.h" | |
6 | |
7 #include <map> | |
8 #include <utility> | |
9 #include <vector> | |
10 | |
11 #include "base/bind.h" | |
12 #include "base/strings/stringprintf.h" | |
13 #include "base/time/time.h" | |
14 #include "base/timer/timer.h" | |
15 #include "components/audio_modem/public/whispernet_client.h" | |
16 #include "components/copresence/copresence_state_impl.h" | |
17 #include "components/copresence/handlers/directive_handler_impl.h" | |
18 #include "components/copresence/handlers/gcm_handler_impl.h" | |
19 #include "components/copresence/proto/rpcs.pb.h" | |
20 #include "components/copresence/rpc/rpc_handler.h" | |
21 | |
22 using google::protobuf::RepeatedPtrField; | |
23 | |
24 using audio_modem::AUDIBLE; | |
25 using audio_modem::AudioToken; | |
26 using audio_modem::INAUDIBLE; | |
27 | |
28 namespace { | |
29 | |
30 const int kPollTimerIntervalMs = 3000; // milliseconds. | |
31 const int kAudioCheckIntervalMs = 1000; // milliseconds. | |
32 | |
33 const int kQueuedMessageTimeout = 10; // seconds. | |
34 const int kMaxQueuedMessages = 1000; | |
35 | |
36 } // namespace | |
37 | |
38 namespace copresence { | |
39 | |
40 bool SupportedTokenMedium(const TokenObservation& token) { | |
41 for (const TokenSignals& signals : token.signals()) { | |
42 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || | |
43 signals.medium() == AUDIO_AUDIBLE_DTMF) | |
44 return true; | |
45 } | |
46 return false; | |
47 } | |
48 | |
49 | |
50 // Public functions. | |
51 | |
52 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) | |
53 : delegate_(delegate), | |
54 whispernet_init_callback_( | |
55 base::Bind(&CopresenceManagerImpl::WhispernetInitComplete, | |
56 // This callback gets cancelled when we are destroyed. | |
57 base::Unretained(this))), | |
58 init_failed_(false), | |
59 state_(new CopresenceStateImpl), | |
60 directive_handler_(new DirectiveHandlerImpl( | |
61 // The directive handler and its descendants | |
62 // will be destructed before the CopresenceState instance. | |
63 base::Bind(&CopresenceStateImpl::UpdateDirectives, | |
64 base::Unretained(state_.get())))), | |
65 poll_timer_(new base::RepeatingTimer), | |
66 audio_check_timer_(new base::RepeatingTimer), | |
67 queued_messages_by_token_( | |
68 base::TimeDelta::FromSeconds(kQueuedMessageTimeout), | |
69 kMaxQueuedMessages) { | |
70 DCHECK(delegate_); | |
71 DCHECK(delegate_->GetWhispernetClient()); | |
72 // TODO(ckehoe): Handle whispernet initialization in the whispernet component. | |
73 delegate_->GetWhispernetClient()->Initialize( | |
74 whispernet_init_callback_.callback()); | |
75 | |
76 MessagesCallback messages_callback = base::Bind( | |
77 &CopresenceManagerImpl::DispatchMessages, | |
78 // This will only be passed to objects that we own. | |
79 base::Unretained(this)); | |
80 | |
81 if (delegate->GetGCMDriver()) | |
82 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), | |
83 directive_handler_.get(), | |
84 messages_callback)); | |
85 | |
86 rpc_handler_.reset(new RpcHandler(delegate, | |
87 directive_handler_.get(), | |
88 state_.get(), | |
89 gcm_handler_.get(), | |
90 messages_callback)); | |
91 | |
92 directive_handler_->Start(delegate_->GetWhispernetClient(), | |
93 base::Bind(&CopresenceManagerImpl::ReceivedTokens, | |
94 base::Unretained(this))); | |
95 } | |
96 | |
97 CopresenceManagerImpl::~CopresenceManagerImpl() { | |
98 whispernet_init_callback_.Cancel(); | |
99 } | |
100 | |
101 CopresenceState* CopresenceManagerImpl::state() { | |
102 return state_.get(); | |
103 } | |
104 | |
105 // Returns false if any operations were malformed. | |
106 void CopresenceManagerImpl::ExecuteReportRequest( | |
107 const ReportRequest& request, | |
108 const std::string& app_id, | |
109 const std::string& auth_token, | |
110 const StatusCallback& callback) { | |
111 // If initialization has failed, reject all requests. | |
112 if (init_failed_) { | |
113 callback.Run(FAIL); | |
114 return; | |
115 } | |
116 | |
117 // We'll need to modify the ReportRequest, so we make our own copy to send. | |
118 std::unique_ptr<ReportRequest> request_copy(new ReportRequest(request)); | |
119 rpc_handler_->SendReportRequest(std::move(request_copy), app_id, auth_token, | |
120 callback); | |
121 } | |
122 | |
123 | |
124 // Private functions. | |
125 | |
126 void CopresenceManagerImpl::WhispernetInitComplete(bool success) { | |
127 if (success) { | |
128 DVLOG(3) << "Whispernet initialized successfully."; | |
129 poll_timer_->Start(FROM_HERE, | |
130 base::TimeDelta::FromMilliseconds(kPollTimerIntervalMs), | |
131 base::Bind(&CopresenceManagerImpl::PollForMessages, | |
132 base::Unretained(this))); | |
133 audio_check_timer_->Start( | |
134 FROM_HERE, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs), | |
135 base::Bind(&CopresenceManagerImpl::AudioCheck, base::Unretained(this))); | |
136 } else { | |
137 LOG(ERROR) << "Whispernet initialization failed!"; | |
138 init_failed_ = true; | |
139 } | |
140 } | |
141 | |
142 void CopresenceManagerImpl::ReceivedTokens( | |
143 const std::vector<AudioToken>& tokens) { | |
144 rpc_handler_->ReportTokens(tokens); | |
145 | |
146 for (const AudioToken audio_token : tokens) { | |
147 const std::string& token_id = audio_token.token; | |
148 DVLOG(3) << "Heard token: " << token_id; | |
149 | |
150 // Update the CopresenceState. | |
151 ReceivedToken token( | |
152 token_id, | |
153 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, | |
154 base::Time::Now()); | |
155 state_->UpdateReceivedToken(token); | |
156 | |
157 // Deliver messages that were pre-sent on this token. | |
158 if (queued_messages_by_token_.HasKey(token_id)) { | |
159 // Not const because we have to remove the required tokens for delivery. | |
160 // We're going to delete this whole vector at the end anyway. | |
161 RepeatedPtrField<SubscribedMessage>* messages = | |
162 queued_messages_by_token_.GetMutableValue(token_id); | |
163 DCHECK_GT(messages->size(), 0) | |
164 << "Empty entry in queued_messages_by_token_"; | |
165 | |
166 // These messages still have their required tokens stored, and | |
167 // DispatchMessages() will still check for them. If we don't remove | |
168 // the tokens before delivery, we'll just end up re-queuing the message. | |
169 for (SubscribedMessage& message : *messages) | |
170 message.mutable_required_token()->Clear(); | |
171 | |
172 DVLOG(3) << "Delivering " << messages->size() | |
173 << " message(s) pre-sent on token " << token_id; | |
174 DispatchMessages(*messages); | |
175 | |
176 // The messages have been delivered, so we don't need to keep them | |
177 // in the queue. Note that the token will still be reported | |
178 // to the server (above), so we'll keep getting the message. | |
179 // But we can now drop our local copy of it. | |
180 int erase_count = queued_messages_by_token_.Erase(token_id); | |
181 DCHECK_GT(erase_count, 0); | |
182 } | |
183 } | |
184 } | |
185 | |
186 void CopresenceManagerImpl::AudioCheck() { | |
187 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && | |
188 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { | |
189 delegate_->HandleStatusUpdate(AUDIO_FAIL); | |
190 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && | |
191 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { | |
192 delegate_->HandleStatusUpdate(AUDIO_FAIL); | |
193 } | |
194 } | |
195 | |
196 // Report our currently playing tokens to the server. | |
197 void CopresenceManagerImpl::PollForMessages() { | |
198 const std::string& audible_token = | |
199 directive_handler_->GetCurrentAudioToken(AUDIBLE); | |
200 const std::string& inaudible_token = | |
201 directive_handler_->GetCurrentAudioToken(INAUDIBLE); | |
202 | |
203 std::vector<AudioToken> tokens; | |
204 if (!audible_token.empty()) | |
205 tokens.push_back(AudioToken(audible_token, true)); | |
206 if (!inaudible_token.empty()) | |
207 tokens.push_back(AudioToken(inaudible_token, false)); | |
208 | |
209 if (!tokens.empty()) | |
210 rpc_handler_->ReportTokens(tokens); | |
211 } | |
212 | |
213 void CopresenceManagerImpl::DispatchMessages( | |
214 const RepeatedPtrField<SubscribedMessage>& messages) { | |
215 if (messages.size() == 0) | |
216 return; | |
217 | |
218 // Index the messages by subscription id. | |
219 std::map<std::string, std::vector<Message>> messages_by_subscription; | |
220 DVLOG(3) << "Processing " << messages.size() << " received message(s)."; | |
221 int immediate_message_count = 0; | |
222 for (const SubscribedMessage& message : messages) { | |
223 // If tokens are required for this message, queue it. | |
224 // Otherwise stage it for delivery. | |
225 if (message.required_token_size() > 0) { | |
226 int supported_token_count = 0; | |
227 for (const TokenObservation& token : message.required_token()) { | |
228 if (SupportedTokenMedium(token)) { | |
229 if (!queued_messages_by_token_.HasKey(token.token_id())) { | |
230 queued_messages_by_token_.Add( | |
231 token.token_id(), RepeatedPtrField<SubscribedMessage>()); | |
232 } | |
233 RepeatedPtrField<SubscribedMessage>* queued_messages = | |
234 queued_messages_by_token_.GetMutableValue(token.token_id()); | |
235 DCHECK(queued_messages); | |
236 queued_messages->Add()->CopyFrom(message); | |
237 supported_token_count++; | |
238 } | |
239 } | |
240 | |
241 if (supported_token_count > 0) { | |
242 DVLOG(3) << "Queued message under " << supported_token_count | |
243 << "token(s)."; | |
244 } else { | |
245 VLOG(2) << "Discarded message that requires one of " | |
246 << message.required_token_size() | |
247 << " token(s), all on unsupported mediums."; | |
248 } | |
249 } else { | |
250 immediate_message_count++; | |
251 for (const std::string& subscription_id : message.subscription_id()) { | |
252 messages_by_subscription[subscription_id].push_back( | |
253 message.published_message()); | |
254 } | |
255 } | |
256 } | |
257 | |
258 // Send the messages for each subscription. | |
259 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " | |
260 << messages_by_subscription.size() << " subscription(s)."; | |
261 for (const auto& map_entry : messages_by_subscription) { | |
262 // TODO(ckehoe): Once we have the app ID from the server, we need to pass | |
263 // it in here and get rid of the app id registry from the main API class. | |
264 const std::string& subscription = map_entry.first; | |
265 const std::vector<Message>& messages = map_entry.second; | |
266 delegate_->HandleMessages(std::string(), subscription, messages); | |
267 } | |
268 } | |
269 | |
270 } // namespace copresence | |
OLD | NEW |