OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license |
3 // found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 #include "components/copresence/rpc/rpc_handler.h" | 5 #include "components/copresence/rpc/rpc_handler.h" |
6 | 6 |
7 #include <map> | |
8 #include <sstream> | |
9 | |
7 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/guid.h" | |
12 #include "base/logging.h" | |
13 #include "base/strings/string_util.h" | |
14 #include "base/time/time.h" | |
15 #include "components/copresence/handlers/directive_handler.h" | |
16 #include "components/copresence/proto/codes.pb.h" | |
8 #include "components/copresence/proto/data.pb.h" | 17 #include "components/copresence/proto/data.pb.h" |
9 #include "components/copresence/proto/rpcs.pb.h" | 18 #include "components/copresence/proto/rpcs.pb.h" |
10 #include "components/copresence/public/copresence_client_delegate.h" | 19 #include "components/copresence/public/copresence_client_delegate.h" |
11 #include "components/copresence/public/whispernet_client.h" | 20 #include "net/http/http_status_code.h" |
21 | |
22 // TODO(ckehoe): Return error messages for bad requests. | |
12 | 23 |
13 namespace copresence { | 24 namespace copresence { |
14 | 25 |
15 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, | 26 using google::protobuf::MessageLite; |
16 SuccessCallback init_done_callback) { | 27 using google::protobuf::RepeatedPtrField; |
17 } | 28 |
18 | 29 namespace { |
19 RpcHandler::~RpcHandler() { | 30 |
20 } | 31 // UrlSafe is defined as: |
21 | 32 // '/' represented by a '_' and '+' represented by a '-' |
22 void RpcHandler::SendReportRequest( | 33 // TODO(rkc): Move this to the wrapper. |
Daniel Erat
2014/08/06 16:01:17
if this is duplicated now (i know i saw it earlier
Charlie
2014/08/06 19:32:19
I have his CLs patched in, but a grep of my tree d
Daniel Erat
2014/08/06 21:35:18
yeah, that's probably what i'm thinking of.
Charlie
2014/08/06 22:36:23
Acknowledged.
| |
23 scoped_ptr<copresence::ReportRequest> request) { | 34 std::string ToUrlSafe(std::string token) { |
24 } | 35 base::ReplaceChars(token, "+", "-", &token); |
25 | 36 base::ReplaceChars(token, "/", "_", &token); |
26 void RpcHandler::SendReportRequest( | 37 return token; |
27 scoped_ptr<copresence::ReportRequest> request, | 38 } |
39 | |
40 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. | |
41 const int kMaxInvalidTokens = 10000; | |
42 | |
43 const char kRegisterDeviceRpcName[] = "registerdevice"; | |
44 const char kReportRequestRpcName[] = "report"; | |
Daniel Erat
2014/08/06 16:01:16
make this be a public static const member so it is
Charlie
2014/08/06 19:32:19
Done.
| |
45 | |
46 // Logging | |
47 | |
48 // Logs the status and returns true if the status was OK. | |
Daniel Erat
2014/08/06 16:01:16
this looks like it only logs the status if it was
Charlie
2014/08/06 19:32:19
Done.
| |
49 bool LogStatus(const Status& status) { | |
50 if (status.code() == OK) | |
51 return true; | |
52 | |
53 std::stringstream log_message; | |
Daniel Erat
2014/08/06 16:01:16
the style guide says not to use streams except for
Charlie
2014/08/06 19:32:19
Sure. Done.
On 2014/08/06 16:01:16, Daniel Erat w
| |
54 log_message << "Copresence error code " << status.code(); | |
55 if (!status.message().empty()) { | |
56 log_message << ": " << status.message(); | |
57 } | |
58 | |
59 LOG(ERROR) << log_message.str(); | |
60 return false; | |
61 } | |
62 | |
63 void LogStatus(const util::error::Code& code, const std::string& context) { | |
Daniel Erat
2014/08/06 16:01:16
don't overload function names
Charlie
2014/08/06 19:32:20
Done.
| |
64 LOG_IF(ERROR, code != util::error::OK) | |
65 << context << " error " << code << ". See " | |
66 << "cs/google3/util/task/codes.proto for more info."; | |
67 } | |
68 | |
69 // Logs the status and returns true if the status was OK. | |
70 bool LogStatus(const ReportResponse& response) { | |
Daniel Erat
2014/08/06 16:01:17
don't overload function names
Charlie
2014/08/06 19:32:19
Done.
| |
71 bool result = LogStatus(response.header().status()); | |
72 if (response.has_manage_messages_response()) { | |
Daniel Erat
2014/08/06 16:01:16
omit curly brackets for single-line statements
Charlie
2014/08/06 19:32:19
Done.
| |
73 LogStatus(response.manage_messages_response().status(), "Publish"); | |
74 } | |
75 if (response.has_manage_subscriptions_response()) { | |
76 LogStatus(response.manage_subscriptions_response().status(), "Subscribe"); | |
77 } | |
78 if (response.has_update_signals_response()) { | |
79 LogStatus(response.update_signals_response().status(), "Update"); | |
80 } | |
81 return result; | |
82 } | |
83 | |
84 // Request construction | |
Daniel Erat
2014/08/06 16:01:16
nit: add trailing period
Charlie
2014/08/06 19:32:19
This is supposed to be a section header. Added a n
| |
85 template <typename T> | |
86 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) { | |
87 if (msg.has_token_exchange_strategy() && | |
88 msg.token_exchange_strategy().has_broadcast_scan_configuration()) { | |
89 return msg.token_exchange_strategy().broadcast_scan_configuration(); | |
90 } | |
91 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN; | |
92 } | |
93 | |
94 // This method will extract token exchange strategies | |
95 // from the publishes and subscribes in a report request. | |
96 BroadcastScanConfiguration ExtractTokenExchangeStrategy( | |
97 ReportRequest* request) { | |
Daniel Erat
2014/08/06 16:01:17
can this be a const reference?
Charlie
2014/08/06 19:32:19
Yes. Done.
On 2014/08/06 16:01:17, Daniel Erat wr
| |
98 bool broadcast_only = false; | |
99 bool scan_only = false; | |
100 | |
101 // Strategies for publishes. | |
102 if (request->has_manage_messages_request()) { | |
103 const RepeatedPtrField<PublishedMessage> messages = | |
104 request->manage_messages_request().message_to_publish(); | |
105 for (int i = 0; i < messages.size(); ++i) { | |
106 BroadcastScanConfiguration config = | |
107 GetBroadcastScanConfig(messages.Get(i)); | |
108 broadcast_only = broadcast_only || config == BROADCAST_ONLY; | |
109 scan_only = scan_only || config == SCAN_ONLY; | |
110 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) | |
111 return BROADCAST_AND_SCAN; | |
112 } | |
113 } | |
114 | |
115 // Strategies for subscriptions. | |
116 if (request->has_manage_subscriptions_request()) { | |
117 const RepeatedPtrField<Subscription> messages = | |
118 request->manage_subscriptions_request().subscription(); | |
119 for (int i = 0; i < messages.size(); ++i) { | |
120 BroadcastScanConfiguration config = | |
121 GetBroadcastScanConfig(messages.Get(i)); | |
122 broadcast_only = broadcast_only || config == BROADCAST_ONLY; | |
123 scan_only = scan_only || config == SCAN_ONLY; | |
124 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) | |
125 return BROADCAST_AND_SCAN; | |
126 } | |
127 } | |
128 | |
129 if (broadcast_only) | |
130 return BROADCAST_ONLY; | |
131 if (scan_only) | |
132 return SCAN_ONLY; | |
133 return BROADCAST_AND_SCAN; | |
Daniel Erat
2014/08/06 16:01:16
do you need to check whether both broadcast_only a
Charlie
2014/08/06 19:32:20
No, this is our default. Added a comment.
On 2014
| |
134 } | |
135 | |
136 DeviceState* GetDeviceCapabilities( | |
Daniel Erat
2014/08/06 16:01:16
return a scoped_ptr instead so the ownership trans
Charlie
2014/08/06 19:32:19
Done.
| |
137 ReportRequest* request) { | |
Daniel Erat
2014/08/06 16:01:16
can this be a const reference?
also, unwrap this
Charlie
2014/08/06 19:32:19
Done.
| |
138 DeviceState* state = new DeviceState; | |
139 | |
140 TokenTechnology* token_technology = | |
141 state->mutable_capabilities()->add_token_technology(); | |
142 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND); | |
143 | |
144 BroadcastScanConfiguration config = | |
145 ExtractTokenExchangeStrategy(request); | |
146 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN) | |
147 token_technology->add_instruction_type(TRANSMIT); | |
148 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN) | |
149 token_technology->add_instruction_type(RECEIVE); | |
150 | |
151 return state; | |
152 } | |
153 | |
154 // TODO(ckehoe): We're keeping this code in a separate function for now | |
155 // because we get a version string from Chrome, but the proto expects | |
156 // an int64 version. We should probably change the version proto | |
157 // to handle a more detailed version. | |
158 ClientVersion* CreateVersion(const std::string& client, | |
159 const std::string& version_name) { | |
160 ClientVersion* version = new ClientVersion; | |
161 | |
162 version->set_client(client); | |
163 version->set_version_name(version_name); | |
164 | |
165 return version; | |
166 } | |
167 | |
168 // Wrapper for the http post constructor. This is the default way | |
169 // to contact the server, but it can be overridden for testing. | |
170 void SendHttpPost(net::URLRequestContextGetter* url_context_getter, | |
171 const std::string& rpc_name, | |
172 scoped_ptr<MessageLite> request_proto, | |
173 const HttpPost::ResponseCallback& callback) { | |
174 new HttpPost(url_context_getter, rpc_name, request_proto.Pass(), callback); | |
175 } | |
176 | |
177 } // namespace | |
178 | |
179 // Public methods | |
180 | |
181 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate) | |
182 : delegate_(delegate), | |
183 invalid_audio_token_cache_( | |
184 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), | |
185 kMaxInvalidTokens), | |
186 server_post_callback_(base::Bind(&SendHttpPost)) { | |
187 } | |
188 | |
189 RpcHandler::~RpcHandler() {} | |
190 | |
191 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { | |
192 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); | |
193 DCHECK(device_id_.empty()); | |
194 device_id_ = delegate_->GetDeviceId(); | |
195 if (!device_id_.empty()) { | |
196 init_done_callback.Run(true); | |
197 return; | |
198 } | |
199 | |
200 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); | |
201 Identity* identity = | |
202 request->mutable_device_identifiers()->mutable_registrant(); | |
203 identity->set_type(CHROME); | |
204 identity->set_chrome_id(base::GenerateGUID()); | |
205 SendServerRequest( | |
206 kRegisterDeviceRpcName, | |
207 std::string(), | |
208 request.Pass(), | |
209 base::Bind(&RpcHandler::RegisterResponseHandler, | |
210 AsWeakPtr(), | |
211 init_done_callback)); | |
212 } | |
213 | |
214 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) { | |
215 SendReportRequest(request.Pass(), std::string(), StatusCallback()); | |
216 } | |
217 | |
218 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, | |
219 const std::string& app_id, | |
220 const StatusCallback& status_callback) { | |
221 DCHECK(request.get()); | |
222 DCHECK(!device_id_.empty()) | |
223 << "\nRpcHandler::Initialize() must complete successfully " | |
Daniel Erat
2014/08/06 16:01:17
why do you have a newline at the beginning of this
Charlie
2014/08/06 19:32:19
I liked the log message better all on one line. Bu
| |
224 << "before other RpcHandler methods are called."; | |
225 | |
226 DVLOG(3) << "Sending report request to server."; | |
227 | |
228 request->mutable_update_signals_request()->set_allocated_state( | |
229 GetDeviceCapabilities(request.get())); | |
230 SendServerRequest( | |
231 kReportRequestRpcName, | |
232 app_id, | |
233 request.Pass(), | |
234 base::Bind( | |
235 &RpcHandler::ReportResponseHandler, AsWeakPtr(), status_callback)); | |
236 } | |
237 | |
238 void RpcHandler::ReportTokens(TokenMedium medium, | |
239 const std::vector<std::string>& tokens) { | |
240 DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND); | |
241 DCHECK(!tokens.empty()); | |
242 | |
243 scoped_ptr<ReportRequest> request(new ReportRequest); | |
244 for (size_t i = 0; i < tokens.size(); ++i) { | |
245 const std::string& token = ToUrlSafe(tokens[i]); | |
246 if (invalid_audio_token_cache_.HasKey(token)) | |
247 continue; | |
248 | |
249 DVLOG(3) << "Sending token " << token << " to server."; | |
250 | |
251 TokenObservation* token_observation = | |
252 request->mutable_update_signals_request()->add_token_observation(); | |
253 token_observation->set_token_id(token); | |
254 | |
255 TokenSignals* signals = token_observation->add_signals(); | |
256 signals->set_medium(medium); | |
257 signals->set_observed_time_millis(base::Time::Now().ToJsTime()); | |
258 } | |
259 SendReportRequest(request.Pass()); | |
260 } | |
261 | |
262 void RpcHandler::ConnectToWhispernet() { | |
263 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); | |
264 | |
265 // Directive_handler will be destructed before us, so unretained is safe. | |
Daniel Erat
2014/08/06 16:01:17
s/Directive_handler/|directive_handler_|/
Charlie
2014/08/06 19:32:20
Done.
| |
266 directive_handler_.reset(new DirectiveHandler( | |
267 base::Bind(&WhispernetClient::DecodeSamples, | |
268 base::Unretained(whispernet_client)), | |
269 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, | |
270 base::Unretained(this)))); | |
271 | |
272 whispernet_client->RegisterTokensCallback( | |
273 base::Bind(&RpcHandler::ReportTokens, | |
274 base::Unretained(this), | |
xiyuan
2014/08/05 21:33:39
AsWeakPtr()?
Charlie
2014/08/06 19:32:20
Done.
| |
275 AUDIO_ULTRASOUND_PASSBAND)); | |
276 } | |
277 | |
278 void RpcHandler::DisconnectFromWhispernet() { | |
279 directive_handler_.reset(); | |
280 } | |
281 | |
282 // Private methods | |
283 | |
284 void RpcHandler::RegisterResponseHandler( | |
285 const SuccessCallback& init_done_callback, | |
286 int http_status_code, | |
287 const std::string& response_data) { | |
288 if (http_status_code != net::HTTP_OK) { | |
289 init_done_callback.Run(false); | |
290 return; | |
291 } | |
292 | |
293 RegisterDeviceResponse response; | |
294 if (!response.ParseFromString(response_data)) { | |
295 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; | |
296 init_done_callback.Run(false); | |
297 return; | |
298 } | |
299 | |
300 if (!LogStatus(response.header().status())) | |
301 return; | |
302 device_id_ = response.registered_device_id(); | |
303 DCHECK(!device_id_.empty()); | |
304 DVLOG(2) << "Device registration successful: id " << device_id_; | |
305 delegate_->SaveDeviceId(device_id_); | |
306 init_done_callback.Run(true); | |
307 } | |
308 | |
309 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, | |
310 int http_status_code, | |
311 const std::string& response_data) { | |
312 if (http_status_code != net::HTTP_OK) { | |
313 if (!status_callback.is_null()) | |
314 status_callback.Run(FAIL); | |
315 return; | |
316 } | |
317 | |
318 DVLOG(3) << "Received ReportResponse."; | |
319 ReportResponse response; | |
320 if (!response.ParseFromString(response_data)) { | |
321 LOG(ERROR) << "Invalid ReportResponse"; | |
322 if (!status_callback.is_null()) | |
323 status_callback.Run(FAIL); | |
324 return; | |
325 } | |
326 | |
327 if (!LogStatus(response)) { | |
328 if (!status_callback.is_null()) | |
329 status_callback.Run(FAIL); | |
330 return; | |
331 } | |
332 | |
333 if (response.has_manage_messages_response()) { | |
334 RepeatedPtrField<MessageResult> message_results = | |
335 response.manage_messages_response().published_message_result(); | |
336 for (int i = 0; i < message_results.size(); ++i) { | |
337 DVLOG(2) << "Published message with id " | |
338 << message_results.Get(i).published_message_id(); | |
339 } | |
340 } | |
341 | |
342 if (response.has_manage_subscriptions_response()) { | |
343 RepeatedPtrField<SubscriptionResult> subscription_results = | |
344 response.manage_subscriptions_response().subscription_result(); | |
345 for (int i = 0; i < subscription_results.size(); ++i) { | |
346 DVLOG(2) << "Created subscription with id " | |
347 << subscription_results.Get(i).subscription_id(); | |
348 } | |
349 } | |
350 | |
351 if (response.has_update_signals_response()) { | |
352 const UpdateSignalsResponse& update_response = | |
353 response.update_signals_response(); | |
354 DispatchMessages(update_response.message()); | |
355 | |
356 if (directive_handler_.get()) { | |
357 for (int i = 0; i < update_response.directive_size(); ++i) | |
358 directive_handler_->AddDirective(update_response.directive(i)); | |
359 } else { | |
360 DVLOG(1) << "No directive handler."; | |
361 } | |
362 | |
363 RepeatedPtrField<Token> tokens = update_response.token(); | |
364 for (int i = 0; i < tokens.size(); ++i) { | |
365 switch (tokens.Get(i).status()) { | |
366 case VALID: | |
367 // TODO(rkc/ckehoe): Store the token in a valid_token_cache_ with a | |
Daniel Erat
2014/08/06 16:01:16
nit: |valid_token_cache_|
Charlie
2014/08/06 19:32:19
Done.
| |
368 // short ttl (like 10s) and send it up with every report request. | |
Daniel Erat
2014/08/06 16:01:16
nit: s/ttl/TTL/
Charlie
2014/08/06 19:32:19
Done.
| |
369 // Then we'll still get messages while we're waiting to hear it again. | |
370 VLOG(1) << "Got valid token " << tokens.Get(i).id(); | |
371 break; | |
372 case INVALID: | |
373 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); | |
374 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); | |
375 break; | |
376 default: | |
377 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " | |
378 << tokens.Get(i).status(); | |
379 } | |
380 } | |
381 } | |
382 | |
383 // TODO(ckehoe): Return a more detailed status response. | |
384 if (!status_callback.is_null()) | |
385 status_callback.Run(SUCCESS); | |
386 } | |
387 | |
388 void RpcHandler::DispatchMessages( | |
389 const RepeatedPtrField<SubscribedMessage>& messages) { | |
390 if (messages.size() == 0) | |
391 return; | |
392 | |
393 // Index the messages by subscription id. | |
394 std::map<std::string, std::vector<Message> > messages_by_subscription; | |
395 DVLOG(3) << "Dispatching " << messages.size() << " messages"; | |
396 for (int m = 0; m < messages.size(); ++m) { | |
397 const RepeatedPtrField<std::string>& subscription_ids = | |
398 messages.Get(m).subscription_id(); | |
399 for (int s = 0; s < subscription_ids.size(); ++s) { | |
400 messages_by_subscription[subscription_ids.Get(s)].push_back( | |
401 messages.Get(m).published_message()); | |
402 } | |
403 } | |
404 | |
405 // Send the messages for each subscription. | |
406 for (std::map<std::string, std::vector<Message> >::const_iterator | |
407 subscription = messages_by_subscription.begin(); | |
408 subscription != messages_by_subscription.end(); | |
409 ++subscription) { | |
410 // TODO(ckehoe): Once we have the app ID from the server, we need to pass | |
411 // it in here and get rid of the app id registry from the main API class. | |
412 delegate_->HandleMessages("", subscription->first, subscription->second); | |
413 } | |
414 } | |
415 | |
416 RequestHeader* RpcHandler::CreateRequestHeader( | |
417 const std::string& client_name) const { | |
418 RequestHeader* header = new RequestHeader; | |
419 | |
420 header->set_allocated_framework_version( | |
421 CreateVersion("Chrome", delegate_->GetPlatformVersionString())); | |
422 if (!client_name.empty()) { | |
423 header->set_allocated_client_version( | |
424 CreateVersion(client_name, std::string())); | |
425 } | |
426 header->set_current_time_millis(base::Time::Now().ToJsTime()); | |
427 header->set_registered_device_id(device_id_); | |
428 | |
429 return header; | |
430 } | |
431 | |
432 template <class T> | |
433 void RpcHandler::SendServerRequest( | |
434 const std::string& rpc_name, | |
28 const std::string& app_id, | 435 const std::string& app_id, |
29 const StatusCallback& status_callback) { | 436 scoped_ptr<T> request, |
30 } | 437 const HttpPost::ResponseCallback& response_handler) { |
31 | 438 request->set_allocated_header(CreateRequestHeader(app_id)); |
32 void RpcHandler::ReportTokens(copresence::TokenMedium medium, | 439 server_post_callback_.Run(delegate_->GetRequestContext(), |
33 const std::vector<std::string>& tokens) { | 440 rpc_name, |
34 } | 441 make_scoped_ptr<MessageLite>(request.release()), |
35 | 442 response_handler); |
36 void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { | 443 } |
37 } | 444 |
38 | 445 void RpcHandler::AudioDirectiveListToWhispernetConnector( |
39 void RpcHandler::DisconnectFromWhispernet() { | 446 const std::string& token, |
447 const WhispernetClient::SamplesCallback& samples_callback) { | |
448 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); | |
449 if (whispernet_client) { | |
450 whispernet_client->RegisterSamplesCallback(samples_callback); | |
451 whispernet_client->EncodeToken(token); | |
452 } | |
40 } | 453 } |
41 | 454 |
42 } // namespace copresence | 455 } // namespace copresence |
OLD | NEW |