OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "components/copresence/copresence_manager_impl.h" | 5 #include "components/copresence/copresence_manager_impl.h" |
6 | 6 |
7 #include <vector> | 7 #include <vector> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/strings/stringprintf.h" | 10 #include "base/strings/stringprintf.h" |
11 #include "base/time/time.h" | 11 #include "base/time/time.h" |
12 #include "base/timer/timer.h" | 12 #include "base/timer/timer.h" |
13 #include "components/copresence/copresence_state_impl.h" | 13 #include "components/copresence/copresence_state_impl.h" |
14 #include "components/copresence/handlers/directive_handler_impl.h" | 14 #include "components/copresence/handlers/directive_handler_impl.h" |
15 #include "components/copresence/handlers/gcm_handler_impl.h" | 15 #include "components/copresence/handlers/gcm_handler_impl.h" |
16 #include "components/copresence/proto/rpcs.pb.h" | 16 #include "components/copresence/proto/rpcs.pb.h" |
17 #include "components/copresence/public/whispernet_client.h" | 17 #include "components/copresence/public/whispernet_client.h" |
18 #include "components/copresence/rpc/rpc_handler.h" | 18 #include "components/copresence/rpc/rpc_handler.h" |
19 | 19 |
20 namespace { | 20 namespace { |
21 | 21 |
22 const int kPollTimerIntervalMs = 3000; // milliseconds. | 22 const int kPollTimerIntervalMs = 3000; // milliseconds. |
23 const int kAudioCheckIntervalMs = 1000; // milliseconds. | 23 const int kAudioCheckIntervalMs = 1000; // milliseconds. |
24 | 24 |
25 } // namespace | 25 } // namespace |
26 | 26 |
27 namespace copresence { | 27 namespace copresence { |
28 | 28 |
29 bool SupportedTokenMedium(const TokenObservation& token) { | |
30 for (const TokenSignals& signals : token.signals()) { | |
31 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND || | |
32 signals.medium() == AUDIO_AUDIBLE_DTMF) | |
33 return true; | |
34 } | |
35 return false; | |
36 } | |
37 | |
38 | |
29 // Public functions. | 39 // Public functions. |
30 | 40 |
31 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) | 41 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) |
32 : delegate_(delegate), | 42 : delegate_(delegate), |
33 whispernet_init_callback_(base::Bind( | 43 whispernet_init_callback_(base::Bind( |
34 &CopresenceManagerImpl::WhispernetInitComplete, | 44 &CopresenceManagerImpl::WhispernetInitComplete, |
35 // This callback gets cancelled when we are destroyed. | 45 // This callback gets cancelled when we are destroyed. |
36 base::Unretained(this))), | 46 base::Unretained(this))), |
37 init_failed_(false), | 47 init_failed_(false), |
38 state_(new CopresenceStateImpl), | 48 state_(new CopresenceStateImpl), |
39 directive_handler_(new DirectiveHandlerImpl( | 49 directive_handler_(new DirectiveHandlerImpl( |
40 // The directive handler and its descendants | 50 // The directive handler and its descendants |
41 // will be destructed before the CopresenceState instance. | 51 // will be destructed before the CopresenceState instance. |
42 base::Bind(&CopresenceStateImpl::UpdateDirectives, | 52 base::Bind(&CopresenceStateImpl::UpdateDirectives, |
43 base::Unretained(state_.get())))), | 53 base::Unretained(state_.get())))), |
44 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), | 54 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), |
45 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) { | 55 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) { |
46 DCHECK(delegate_); | 56 DCHECK(delegate_); |
47 DCHECK(delegate_->GetWhispernetClient()); | 57 DCHECK(delegate_->GetWhispernetClient()); |
48 delegate_->GetWhispernetClient()->Initialize( | 58 delegate_->GetWhispernetClient()->Initialize( |
49 whispernet_init_callback_.callback()); | 59 whispernet_init_callback_.callback()); |
50 | 60 |
61 MessagesCallback messages_callback = base::Bind( | |
62 &CopresenceManagerImpl::DispatchMessages< | |
63 google::protobuf::RepeatedPtrField<SubscribedMessage>>, | |
64 base::Unretained(this)); | |
rkc
2015/01/07 21:31:43
Add a one liner about why unretained is safe here.
Charlie
2015/01/07 22:06:07
Done.
| |
65 | |
51 if (delegate->GetGCMDriver()) | 66 if (delegate->GetGCMDriver()) |
52 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), | 67 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), |
53 directive_handler_.get())); | 68 directive_handler_.get(), |
69 messages_callback)); | |
54 | 70 |
55 rpc_handler_.reset(new RpcHandler(delegate, | 71 rpc_handler_.reset(new RpcHandler(delegate, |
56 state_.get(), | 72 state_.get(), |
57 directive_handler_.get(), | 73 directive_handler_.get(), |
58 gcm_handler_.get())); | 74 gcm_handler_.get(), |
75 messages_callback)); | |
59 } | 76 } |
60 | 77 |
61 CopresenceManagerImpl::~CopresenceManagerImpl() { | 78 CopresenceManagerImpl::~CopresenceManagerImpl() { |
62 whispernet_init_callback_.Cancel(); | 79 whispernet_init_callback_.Cancel(); |
63 } | 80 } |
64 | 81 |
65 CopresenceState* CopresenceManagerImpl::state() { | 82 CopresenceState* CopresenceManagerImpl::state() { |
66 return state_.get(); | 83 return state_.get(); |
67 } | 84 } |
68 | 85 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
106 } else { | 123 } else { |
107 LOG(ERROR) << "Whispernet initialization failed!"; | 124 LOG(ERROR) << "Whispernet initialization failed!"; |
108 init_failed_ = true; | 125 init_failed_ = true; |
109 } | 126 } |
110 } | 127 } |
111 | 128 |
112 void CopresenceManagerImpl::ReceivedTokens( | 129 void CopresenceManagerImpl::ReceivedTokens( |
113 const std::vector<AudioToken>& tokens) { | 130 const std::vector<AudioToken>& tokens) { |
114 rpc_handler_->ReportTokens(tokens); | 131 rpc_handler_->ReportTokens(tokens); |
115 | 132 |
116 // Update the CopresenceState. | |
117 for (const AudioToken audio_token : tokens) { | 133 for (const AudioToken audio_token : tokens) { |
118 DVLOG(3) << "Heard token: " << audio_token.token; | 134 const std::string& token_id = audio_token.token; |
135 DVLOG(3) << "Heard token: " << token_id; | |
136 | |
137 // Update the CopresenceState. | |
119 ReceivedToken token( | 138 ReceivedToken token( |
120 audio_token.token, | 139 token_id, |
121 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, | 140 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, |
122 base::Time::Now()); | 141 base::Time::Now()); |
123 state_->UpdateReceivedToken(token); | 142 state_->UpdateReceivedToken(token); |
143 | |
144 // Deliver messages that were pre-sent on this token. | |
145 if (queued_messages_by_token_.count(token_id) > 0) { | |
146 // Not const because we have to remove the required tokens for delivery. | |
147 // We're going to delete this whole vector at the end anyway. | |
148 std::vector<SubscribedMessage>& messages = | |
149 queued_messages_by_token_[token_id]; | |
150 DCHECK(!messages.empty()) << "Empty entry in queued_messages_by_token_"; | |
151 | |
152 // These messages still have their required tokens stored, and | |
153 // DispatchMessages() will still check for them. If we don't remove | |
154 // the tokens before delivery, we'll just end up re-queuing the message. | |
155 for (SubscribedMessage& message : messages) | |
156 message.mutable_required_token()->Clear(); | |
157 | |
158 DVLOG(3) << "Delivering " << messages.size() | |
159 << " message(s) pre-sent on token " << token_id; | |
160 DispatchMessages(messages); | |
161 | |
162 int erase_count = queued_messages_by_token_.erase(token_id); | |
rkc
2015/01/07 21:31:43
Is this the behavior we want? Currently for non-pr
Charlie
2015/01/07 22:06:07
The token will still be reported to the server and
| |
163 DCHECK_GT(erase_count, 0); | |
164 } | |
124 } | 165 } |
125 } | 166 } |
126 | 167 |
168 void CopresenceManagerImpl::AudioCheck() { | |
169 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && | |
170 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { | |
171 delegate_->HandleStatusUpdate(AUDIO_FAIL); | |
172 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && | |
173 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { | |
174 delegate_->HandleStatusUpdate(AUDIO_FAIL); | |
175 } | |
176 } | |
177 | |
178 // Report our currently playing tokens to the server. | |
127 void CopresenceManagerImpl::PollForMessages() { | 179 void CopresenceManagerImpl::PollForMessages() { |
128 // Report our currently playing tokens. | |
129 const std::string& audible_token = | 180 const std::string& audible_token = |
130 directive_handler_->GetCurrentAudioToken(AUDIBLE); | 181 directive_handler_->GetCurrentAudioToken(AUDIBLE); |
131 const std::string& inaudible_token = | 182 const std::string& inaudible_token = |
132 directive_handler_->GetCurrentAudioToken(INAUDIBLE); | 183 directive_handler_->GetCurrentAudioToken(INAUDIBLE); |
133 | 184 |
134 std::vector<AudioToken> tokens; | 185 std::vector<AudioToken> tokens; |
135 if (!audible_token.empty()) | 186 if (!audible_token.empty()) |
136 tokens.push_back(AudioToken(audible_token, true)); | 187 tokens.push_back(AudioToken(audible_token, true)); |
137 if (!inaudible_token.empty()) | 188 if (!inaudible_token.empty()) |
138 tokens.push_back(AudioToken(inaudible_token, false)); | 189 tokens.push_back(AudioToken(inaudible_token, false)); |
139 | 190 |
140 if (!tokens.empty()) | 191 if (!tokens.empty()) |
141 rpc_handler_->ReportTokens(tokens); | 192 rpc_handler_->ReportTokens(tokens); |
142 } | 193 } |
143 | 194 |
144 void CopresenceManagerImpl::AudioCheck() { | 195 template<typename IterableSubscribedMessages> |
145 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && | 196 void CopresenceManagerImpl::DispatchMessages( |
146 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { | 197 const IterableSubscribedMessages& messages) { |
147 delegate_->HandleStatusUpdate(AUDIO_FAIL); | 198 if (messages.size() == 0) |
148 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && | 199 return; |
149 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { | 200 |
150 delegate_->HandleStatusUpdate(AUDIO_FAIL); | 201 // Index the messages by subscription id. |
202 std::map<std::string, std::vector<Message>> messages_by_subscription; | |
203 DVLOG(3) << "Processing " << messages.size() << " received message(s)."; | |
204 int immediate_message_count = 0; | |
205 for (const SubscribedMessage& message : messages) { | |
206 // If tokens are required for this message, queue it. | |
207 // Otherwise stage it for delivery. | |
208 if (message.required_token_size() > 0) { | |
209 int supported_token_count = 0; | |
210 for (const TokenObservation& token : message.required_token()) { | |
211 if (SupportedTokenMedium(token)) { | |
212 queued_messages_by_token_[token.token_id()].push_back(message); | |
213 supported_token_count++; | |
214 } | |
215 } | |
216 | |
217 if (supported_token_count > 0) { | |
218 DVLOG(3) << "Queued message under " << supported_token_count | |
219 << "token(s)."; | |
220 } else { | |
221 VLOG(2) << "Discarded message that requires one of " | |
222 << message.required_token_size() | |
223 << " token(s), all on unsupported mediums."; | |
224 } | |
225 } else { | |
226 immediate_message_count++; | |
227 for (const std::string& subscription_id : message.subscription_id()) { | |
228 messages_by_subscription[subscription_id].push_back( | |
229 message.published_message()); | |
230 } | |
231 } | |
232 } | |
233 | |
234 // Send the messages for each subscription. | |
235 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for " | |
236 << messages_by_subscription.size() << " subscription(s)."; | |
237 for (const auto& map_entry : messages_by_subscription) { | |
238 // TODO(ckehoe): Once we have the app ID from the server, we need to pass | |
239 // it in here and get rid of the app id registry from the main API class. | |
240 const std::string& subscription = map_entry.first; | |
241 const std::vector<Message>& messages = map_entry.second; | |
242 delegate_->HandleMessages(std::string(), subscription, messages); | |
151 } | 243 } |
152 } | 244 } |
153 | 245 |
154 } // namespace copresence | 246 } // namespace copresence |
OLD | NEW |