Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: components/copresence/copresence_manager_impl.cc

Issue 813553002: Adding support for pre-sent messages (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@state
Patch Set: Switching to TimedMap Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/copresence/copresence_manager_impl.h" 5 #include "components/copresence/copresence_manager_impl.h"
6 6
7 #include <map>
7 #include <vector> 8 #include <vector>
8 9
9 #include "base/bind.h" 10 #include "base/bind.h"
10 #include "base/strings/stringprintf.h" 11 #include "base/strings/stringprintf.h"
11 #include "base/time/time.h" 12 #include "base/time/time.h"
12 #include "base/timer/timer.h" 13 #include "base/timer/timer.h"
13 #include "components/copresence/copresence_state_impl.h" 14 #include "components/copresence/copresence_state_impl.h"
14 #include "components/copresence/handlers/directive_handler_impl.h" 15 #include "components/copresence/handlers/directive_handler_impl.h"
15 #include "components/copresence/handlers/gcm_handler_impl.h" 16 #include "components/copresence/handlers/gcm_handler_impl.h"
16 #include "components/copresence/proto/rpcs.pb.h" 17 #include "components/copresence/proto/rpcs.pb.h"
17 #include "components/copresence/public/whispernet_client.h" 18 #include "components/copresence/public/whispernet_client.h"
18 #include "components/copresence/rpc/rpc_handler.h" 19 #include "components/copresence/rpc/rpc_handler.h"
19 20
20 namespace { 21 namespace {
21 22
22 const int kPollTimerIntervalMs = 3000; // milliseconds. 23 const int kPollTimerIntervalMs = 3000; // milliseconds.
23 const int kAudioCheckIntervalMs = 1000; // milliseconds. 24 const int kAudioCheckIntervalMs = 1000; // milliseconds.
24 25
26 const int kQueuedMessageTimeout = 10; // seconds.
27 const int kMaxQueuedMessages = 1000;
28
25 } // namespace 29 } // namespace
26 30
27 namespace copresence { 31 namespace copresence {
28 32
33 bool SupportedTokenMedium(const TokenObservation& token) {
34 for (const TokenSignals& signals : token.signals()) {
35 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND ||
36 signals.medium() == AUDIO_AUDIBLE_DTMF)
37 return true;
38 }
39 return false;
40 }
41
42
29 // Public functions. 43 // Public functions.
30 44
31 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) 45 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
32 : delegate_(delegate), 46 : delegate_(delegate),
33 whispernet_init_callback_(base::Bind( 47 whispernet_init_callback_(base::Bind(
34 &CopresenceManagerImpl::WhispernetInitComplete, 48 &CopresenceManagerImpl::WhispernetInitComplete,
35 // This callback gets cancelled when we are destroyed. 49 // This callback gets cancelled when we are destroyed.
36 base::Unretained(this))), 50 base::Unretained(this))),
37 init_failed_(false), 51 init_failed_(false),
38 state_(new CopresenceStateImpl), 52 state_(new CopresenceStateImpl),
39 directive_handler_(new DirectiveHandlerImpl( 53 directive_handler_(new DirectiveHandlerImpl(
40 // The directive handler and its descendants 54 // The directive handler and its descendants
41 // will be destructed before the CopresenceState instance. 55 // will be destructed before the CopresenceState instance.
42 base::Bind(&CopresenceStateImpl::UpdateDirectives, 56 base::Bind(&CopresenceStateImpl::UpdateDirectives,
43 base::Unretained(state_.get())))), 57 base::Unretained(state_.get())))),
44 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), 58 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
45 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) { 59 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
60 queued_messages_by_token_(
61 base::TimeDelta::FromSeconds(kQueuedMessageTimeout),
62 kMaxQueuedMessages) {
46 DCHECK(delegate_); 63 DCHECK(delegate_);
47 DCHECK(delegate_->GetWhispernetClient()); 64 DCHECK(delegate_->GetWhispernetClient());
48 delegate_->GetWhispernetClient()->Initialize( 65 delegate_->GetWhispernetClient()->Initialize(
49 whispernet_init_callback_.callback()); 66 whispernet_init_callback_.callback());
50 67
68 MessagesCallback messages_callback = base::Bind(
69 &CopresenceManagerImpl::DispatchMessages<
70 google::protobuf::RepeatedPtrField<SubscribedMessage>>,
71 // This will only be passed to objects that we own.
72 base::Unretained(this));
73
51 if (delegate->GetGCMDriver()) 74 if (delegate->GetGCMDriver())
52 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(), 75 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(),
53 directive_handler_.get())); 76 directive_handler_.get(),
77 messages_callback));
54 78
55 rpc_handler_.reset(new RpcHandler(delegate, 79 rpc_handler_.reset(new RpcHandler(delegate,
56 state_.get(), 80 state_.get(),
57 directive_handler_.get(), 81 directive_handler_.get(),
58 gcm_handler_.get())); 82 gcm_handler_.get(),
83 messages_callback));
59 } 84 }
60 85
61 CopresenceManagerImpl::~CopresenceManagerImpl() { 86 CopresenceManagerImpl::~CopresenceManagerImpl() {
62 whispernet_init_callback_.Cancel(); 87 whispernet_init_callback_.Cancel();
63 } 88 }
64 89
65 CopresenceState* CopresenceManagerImpl::state() { 90 CopresenceState* CopresenceManagerImpl::state() {
66 return state_.get(); 91 return state_.get();
67 } 92 }
68 93
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
106 } else { 131 } else {
107 LOG(ERROR) << "Whispernet initialization failed!"; 132 LOG(ERROR) << "Whispernet initialization failed!";
108 init_failed_ = true; 133 init_failed_ = true;
109 } 134 }
110 } 135 }
111 136
112 void CopresenceManagerImpl::ReceivedTokens( 137 void CopresenceManagerImpl::ReceivedTokens(
113 const std::vector<AudioToken>& tokens) { 138 const std::vector<AudioToken>& tokens) {
114 rpc_handler_->ReportTokens(tokens); 139 rpc_handler_->ReportTokens(tokens);
115 140
116 // Update the CopresenceState.
117 for (const AudioToken audio_token : tokens) { 141 for (const AudioToken audio_token : tokens) {
118 DVLOG(3) << "Heard token: " << audio_token.token; 142 const std::string& token_id = audio_token.token;
143 DVLOG(3) << "Heard token: " << token_id;
144
145 // Update the CopresenceState.
119 ReceivedToken token( 146 ReceivedToken token(
120 audio_token.token, 147 token_id,
121 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND, 148 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND,
122 base::Time::Now()); 149 base::Time::Now());
123 state_->UpdateReceivedToken(token); 150 state_->UpdateReceivedToken(token);
151
152 // Deliver messages that were pre-sent on this token.
153 if (queued_messages_by_token_.HasKey(token_id)) {
154 // Not const because we have to remove the required tokens for delivery.
155 // We're going to delete this whole vector at the end anyway.
156 std::vector<SubscribedMessage>* messages =
157 queued_messages_by_token_.GetMutableValue(token_id);
158 DCHECK(!messages->empty()) << "Empty entry in queued_messages_by_token_";
159
160 // These messages still have their required tokens stored, and
161 // DispatchMessages() will still check for them. If we don't remove
162 // the tokens before delivery, we'll just end up re-queuing the message.
163 for (SubscribedMessage& message : *messages)
164 message.mutable_required_token()->Clear();
165
166 DVLOG(3) << "Delivering " << messages->size()
167 << " message(s) pre-sent on token " << token_id;
168 DispatchMessages(*messages);
169
170 // The messages have been delivered, so we don't need to keep them
171 // in the queue. Note that the token will still be reported
172 // to the server (above), so we'll keep getting the message.
173 // But we can now drop our local copy of it.
174 int erase_count = queued_messages_by_token_.Erase(token_id);
175 DCHECK_GT(erase_count, 0);
176 }
124 } 177 }
125 } 178 }
126 179
180 void CopresenceManagerImpl::AudioCheck() {
181 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() &&
182 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) {
183 delegate_->HandleStatusUpdate(AUDIO_FAIL);
184 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() &&
185 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) {
186 delegate_->HandleStatusUpdate(AUDIO_FAIL);
187 }
188 }
189
190 // Report our currently playing tokens to the server.
127 void CopresenceManagerImpl::PollForMessages() { 191 void CopresenceManagerImpl::PollForMessages() {
128 // Report our currently playing tokens.
129 const std::string& audible_token = 192 const std::string& audible_token =
130 directive_handler_->GetCurrentAudioToken(AUDIBLE); 193 directive_handler_->GetCurrentAudioToken(AUDIBLE);
131 const std::string& inaudible_token = 194 const std::string& inaudible_token =
132 directive_handler_->GetCurrentAudioToken(INAUDIBLE); 195 directive_handler_->GetCurrentAudioToken(INAUDIBLE);
133 196
134 std::vector<AudioToken> tokens; 197 std::vector<AudioToken> tokens;
135 if (!audible_token.empty()) 198 if (!audible_token.empty())
136 tokens.push_back(AudioToken(audible_token, true)); 199 tokens.push_back(AudioToken(audible_token, true));
137 if (!inaudible_token.empty()) 200 if (!inaudible_token.empty())
138 tokens.push_back(AudioToken(inaudible_token, false)); 201 tokens.push_back(AudioToken(inaudible_token, false));
139 202
140 if (!tokens.empty()) 203 if (!tokens.empty())
141 rpc_handler_->ReportTokens(tokens); 204 rpc_handler_->ReportTokens(tokens);
142 } 205 }
143 206
144 void CopresenceManagerImpl::AudioCheck() { 207 template<typename IterableSubscribedMessages>
145 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() && 208 void CopresenceManagerImpl::DispatchMessages(
146 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) { 209 const IterableSubscribedMessages& messages) {
147 delegate_->HandleStatusUpdate(AUDIO_FAIL); 210 if (messages.size() == 0)
148 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() && 211 return;
149 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) { 212
150 delegate_->HandleStatusUpdate(AUDIO_FAIL); 213 // Index the messages by subscription id.
214 std::map<std::string, std::vector<Message>> messages_by_subscription;
215 DVLOG(3) << "Processing " << messages.size() << " received message(s).";
216 int immediate_message_count = 0;
217 for (const SubscribedMessage& message : messages) {
218 // If tokens are required for this message, queue it.
219 // Otherwise stage it for delivery.
220 if (message.required_token_size() > 0) {
221 int supported_token_count = 0;
222 for (const TokenObservation& token : message.required_token()) {
223 if (SupportedTokenMedium(token)) {
224 queued_messages_by_token_.GetMutableValue(token.token_id())
225 ->push_back(message);
226 supported_token_count++;
227 }
228 }
229
230 if (supported_token_count > 0) {
231 DVLOG(3) << "Queued message under " << supported_token_count
232 << "token(s).";
233 } else {
234 VLOG(2) << "Discarded message that requires one of "
235 << message.required_token_size()
236 << " token(s), all on unsupported mediums.";
237 }
238 } else {
239 immediate_message_count++;
240 for (const std::string& subscription_id : message.subscription_id()) {
241 messages_by_subscription[subscription_id].push_back(
242 message.published_message());
243 }
244 }
245 }
246
247 // Send the messages for each subscription.
248 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for "
249 << messages_by_subscription.size() << " subscription(s).";
250 for (const auto& map_entry : messages_by_subscription) {
251 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
252 // it in here and get rid of the app id registry from the main API class.
253 const std::string& subscription = map_entry.first;
254 const std::vector<Message>& messages = map_entry.second;
255 delegate_->HandleMessages(std::string(), subscription, messages);
151 } 256 }
152 } 257 }
153 258
154 } // namespace copresence 259 } // namespace copresence
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698