OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "components/copresence/rpc/rpc_handler.h" | 5 #include "components/copresence/rpc/rpc_handler.h" |
6 | 6 |
7 #include <map> | |
8 | |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/command_line.h" | |
11 #include "base/guid.h" | |
12 #include "base/logging.h" | |
13 #include "base/strings/string_util.h" | |
14 #include "base/time/time.h" | |
15 #include "components/copresence/copresence_switches.h" | |
16 #include "components/copresence/handlers/directive_handler.h" | |
17 #include "components/copresence/proto/codes.pb.h" | |
8 #include "components/copresence/proto/data.pb.h" | 18 #include "components/copresence/proto/data.pb.h" |
9 #include "components/copresence/proto/rpcs.pb.h" | 19 #include "components/copresence/proto/rpcs.pb.h" |
10 #include "components/copresence/public/copresence_client_delegate.h" | 20 #include "components/copresence/public/copresence_client_delegate.h" |
11 #include "components/copresence/public/whispernet_client.h" | 21 #include "net/http/http_status_code.h" |
22 | |
23 // TODO(ckehoe): Return error messages for bad requests. | |
12 | 24 |
13 namespace copresence { | 25 namespace copresence { |
14 | 26 |
15 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, | 27 using google::protobuf::MessageLite; |
16 SuccessCallback init_done_callback) { | 28 using google::protobuf::RepeatedPtrField; |
17 } | 29 |
30 const char RpcHandler::kReportRequestRpcName[] = "report"; | |
31 | |
32 namespace { | |
33 | |
34 // UrlSafe is defined as: | |
35 // '/' represented by a '_' and '+' represented by a '-' | |
36 // TODO(rkc): Move this to the wrapper. | |
37 std::string ToUrlSafe(std::string token) { | |
38 base::ReplaceChars(token, "+", "-", &token); | |
39 base::ReplaceChars(token, "/", "_", &token); | |
40 return token; | |
41 } | |
42 | |
43 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. | |
44 const int kMaxInvalidTokens = 10000; | |
45 const char kRegisterDeviceRpcName[] = "registerdevice"; | |
46 const char kDefaultCopresenceServer[] = | |
47 "https://www.googleapis.com/copresence/v2/copresence"; | |
48 | |
49 // Logging | |
50 | |
51 // Checks for a copresence error. If there is one, logs it and returns true. | |
52 bool CopresenceErrorLogged(const Status& status) { | |
53 if (status.code() != OK) { | |
54 LOG(ERROR) << "Copresence error code " << status.code() | |
55 << (status.message().empty() ? std::string() : | |
56 ": " + status.message()); | |
57 } | |
58 return status.code() != OK; | |
59 } | |
60 | |
61 void LogIfErrorStatus(const util::error::Code& code, | |
62 const std::string& context) { | |
63 LOG_IF(ERROR, code != util::error::OK) | |
64 << context << " error " << code << ". See " | |
65 << "cs/google3/util/task/codes.proto for more info."; | |
66 } | |
67 | |
68 // If any errors occurred, logs them and returns true. | |
69 bool ReportErrorLogged(const ReportResponse& response) { | |
70 bool result = CopresenceErrorLogged(response.header().status()); | |
71 | |
72 // The Report fails or succeeds as a unit. If any responses had errors, | |
73 // the header will too. Thus we don't need to propagate individual errors. | |
74 if (response.has_update_signals_response()) | |
75 LogIfErrorStatus(response.update_signals_response().status(), "Update"); | |
76 if (response.has_manage_messages_response()) | |
77 LogIfErrorStatus(response.manage_messages_response().status(), "Publish"); | |
78 if (response.has_manage_subscriptions_response()) { | |
79 LogIfErrorStatus(response.manage_subscriptions_response().status(), | |
80 "Subscribe"); | |
81 } | |
82 | |
83 return result; | |
84 } | |
85 | |
86 // Request construction | |
87 | |
88 template <typename T> | |
89 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) { | |
90 if (msg.has_token_exchange_strategy() && | |
91 msg.token_exchange_strategy().has_broadcast_scan_configuration()) { | |
92 return msg.token_exchange_strategy().broadcast_scan_configuration(); | |
93 } | |
94 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN; | |
95 } | |
96 | |
97 // This method will extract token exchange strategies | |
98 // from the publishes and subscribes in a report request. | |
99 // TODO(ckehoe): Delete this when the server supports | |
100 // BroadcastScanConfiguration. | |
101 BroadcastScanConfiguration ExtractTokenExchangeStrategy( | |
102 const ReportRequest& request) { | |
103 bool broadcast_only = false; | |
104 bool scan_only = false; | |
105 | |
106 // Strategies for publishes. | |
107 if (request.has_manage_messages_request()) { | |
108 const RepeatedPtrField<PublishedMessage> messages = | |
109 request.manage_messages_request().message_to_publish(); | |
110 for (int i = 0; i < messages.size(); ++i) { | |
111 BroadcastScanConfiguration config = | |
112 GetBroadcastScanConfig(messages.Get(i)); | |
113 broadcast_only = broadcast_only || config == BROADCAST_ONLY; | |
114 scan_only = scan_only || config == SCAN_ONLY; | |
115 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) | |
116 return BROADCAST_AND_SCAN; | |
117 } | |
118 } | |
119 | |
120 // Strategies for subscriptions. | |
121 if (request.has_manage_subscriptions_request()) { | |
122 const RepeatedPtrField<Subscription> messages = | |
123 request.manage_subscriptions_request().subscription(); | |
124 for (int i = 0; i < messages.size(); ++i) { | |
125 BroadcastScanConfiguration config = | |
126 GetBroadcastScanConfig(messages.Get(i)); | |
127 broadcast_only = broadcast_only || config == BROADCAST_ONLY; | |
128 scan_only = scan_only || config == SCAN_ONLY; | |
129 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) | |
130 return BROADCAST_AND_SCAN; | |
131 } | |
132 } | |
133 | |
134 if (broadcast_only) | |
135 return BROADCAST_ONLY; | |
136 if (scan_only) | |
137 return SCAN_ONLY; | |
138 | |
139 // If nothing else is specified, default to both broadcast and scan. | |
140 return BROADCAST_AND_SCAN; | |
141 } | |
142 | |
143 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) { | |
144 scoped_ptr<DeviceState> state(new DeviceState); | |
145 | |
146 TokenTechnology* token_technology = | |
147 state->mutable_capabilities()->add_token_technology(); | |
148 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND); | |
149 | |
150 BroadcastScanConfiguration config = | |
151 ExtractTokenExchangeStrategy(request); | |
152 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN) | |
153 token_technology->add_instruction_type(TRANSMIT); | |
154 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN) | |
155 token_technology->add_instruction_type(RECEIVE); | |
156 | |
157 return state.Pass(); | |
158 } | |
159 | |
160 // TODO(ckehoe): We're keeping this code in a separate function for now | |
161 // because we get a version string from Chrome, but the proto expects | |
162 // an int64 version. We should probably change the version proto | |
163 // to handle a more detailed version. | |
164 ClientVersion* CreateVersion(const std::string& client, | |
165 const std::string& version_name) { | |
166 ClientVersion* version = new ClientVersion; | |
167 | |
168 version->set_client(client); | |
169 version->set_version_name(version_name); | |
170 | |
171 return version; | |
172 } | |
173 | |
174 } // namespace | |
175 | |
176 // Public methods | |
177 | |
178 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate) | |
179 : delegate_(delegate), | |
180 invalid_audio_token_cache_( | |
181 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), | |
182 kMaxInvalidTokens), | |
183 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, | |
184 base::Unretained(this))) {} | |
18 | 185 |
19 RpcHandler::~RpcHandler() { | 186 RpcHandler::~RpcHandler() { |
20 } | 187 for (std::set<HttpPost*>::iterator post = pending_posts_.begin(); |
21 | 188 post != pending_posts_.end(); ++post) { |
22 void RpcHandler::SendReportRequest( | 189 delete *post; |
23 scoped_ptr<copresence::ReportRequest> request) { | 190 } |
24 } | 191 |
25 | 192 if (delegate_ && delegate_->GetWhispernetClient()) { |
26 void RpcHandler::SendReportRequest( | 193 delegate_->GetWhispernetClient()->RegisterTokensCallback( |
27 scoped_ptr<copresence::ReportRequest> request, | 194 WhispernetClient::TokensCallback()); |
195 } | |
196 } | |
197 | |
198 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { | |
199 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); | |
200 DCHECK(device_id_.empty()); | |
201 device_id_ = delegate_->GetDeviceId(); | |
202 if (!device_id_.empty()) { | |
203 init_done_callback.Run(true); | |
204 return; | |
205 } | |
206 | |
207 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); | |
208 Identity* identity = | |
209 request->mutable_device_identifiers()->mutable_registrant(); | |
210 identity->set_type(CHROME); | |
211 identity->set_chrome_id(base::GenerateGUID()); | |
212 SendServerRequest( | |
213 kRegisterDeviceRpcName, | |
214 std::string(), | |
215 request.Pass(), | |
216 base::Bind(&RpcHandler::RegisterResponseHandler, | |
217 // On delete, this request will be cancelled. | |
rkc
2014/08/07 23:41:18
nit: s/delete/destruction
Charlie
2014/08/07 23:57:11
Done.
| |
218 base::Unretained(this), | |
219 init_done_callback)); | |
220 } | |
221 | |
222 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) { | |
223 SendReportRequest(request.Pass(), std::string(), StatusCallback()); | |
224 } | |
225 | |
226 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, | |
227 const std::string& app_id, | |
228 const StatusCallback& status_callback) { | |
229 DCHECK(request.get()); | |
230 DCHECK(!device_id_.empty()) | |
231 << "RpcHandler::Initialize() must complete successfully " | |
232 << "before other RpcHandler methods are called."; | |
233 | |
234 DVLOG(3) << "Sending report request to server."; | |
235 | |
236 request->mutable_update_signals_request()->set_allocated_state( | |
237 GetDeviceCapabilities(*request).release()); | |
238 SendServerRequest(kReportRequestRpcName, | |
239 app_id, | |
240 request.Pass(), | |
241 base::Bind(&RpcHandler::ReportResponseHandler, | |
242 // On delete, this request will be cancelled. | |
rkc
2014/08/07 23:41:18
nit: ditto
Charlie
2014/08/07 23:57:11
Done.
| |
243 base::Unretained(this), | |
244 status_callback)); | |
245 } | |
246 | |
247 void RpcHandler::ReportTokens(TokenMedium medium, | |
248 const std::vector<std::string>& tokens) { | |
249 DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND); | |
250 DCHECK(!tokens.empty()); | |
251 | |
252 scoped_ptr<ReportRequest> request(new ReportRequest); | |
253 for (size_t i = 0; i < tokens.size(); ++i) { | |
254 const std::string& token = ToUrlSafe(tokens[i]); | |
255 if (invalid_audio_token_cache_.HasKey(token)) | |
256 continue; | |
257 | |
258 DVLOG(3) << "Sending token " << token << " to server."; | |
259 | |
260 TokenObservation* token_observation = | |
261 request->mutable_update_signals_request()->add_token_observation(); | |
262 token_observation->set_token_id(token); | |
263 | |
264 TokenSignals* signals = token_observation->add_signals(); | |
265 signals->set_medium(medium); | |
266 signals->set_observed_time_millis(base::Time::Now().ToJsTime()); | |
267 } | |
268 SendReportRequest(request.Pass()); | |
269 } | |
270 | |
271 void RpcHandler::ConnectToWhispernet() { | |
272 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); | |
273 | |
274 // |directive_handler_| will be destructed with us, so unretained is safe. | |
275 directive_handler_.reset(new DirectiveHandler); | |
276 directive_handler_->Initialize( | |
277 base::Bind(&WhispernetClient::DecodeSamples, | |
278 base::Unretained(whispernet_client)), | |
279 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, | |
280 base::Unretained(this))); | |
281 | |
282 whispernet_client->RegisterTokensCallback( | |
283 base::Bind(&RpcHandler::ReportTokens, | |
284 // On delete, this callback will be disconnected. | |
285 base::Unretained(this), | |
286 AUDIO_ULTRASOUND_PASSBAND)); | |
287 } | |
288 | |
289 // Private methods | |
290 | |
291 void RpcHandler::RegisterResponseHandler( | |
292 const SuccessCallback& init_done_callback, | |
293 int http_status_code, | |
294 const std::string& response_data, | |
295 HttpPost* completed_post) { | |
296 if (completed_post) { | |
297 DCHECK(pending_posts_.erase(completed_post)); | |
298 delete completed_post; | |
299 } | |
300 | |
301 if (http_status_code != net::HTTP_OK) { | |
302 init_done_callback.Run(false); | |
303 return; | |
304 } | |
305 | |
306 RegisterDeviceResponse response; | |
307 if (!response.ParseFromString(response_data)) { | |
308 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; | |
309 init_done_callback.Run(false); | |
310 return; | |
311 } | |
312 | |
313 if (CopresenceErrorLogged(response.header().status())) | |
314 return; | |
315 device_id_ = response.registered_device_id(); | |
316 DCHECK(!device_id_.empty()); | |
317 DVLOG(2) << "Device registration successful: id " << device_id_; | |
318 delegate_->SaveDeviceId(device_id_); | |
319 init_done_callback.Run(true); | |
320 } | |
321 | |
322 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, | |
323 int http_status_code, | |
324 const std::string& response_data, | |
325 HttpPost* completed_post) { | |
326 if (completed_post) { | |
327 DCHECK(pending_posts_.erase(completed_post)); | |
328 delete completed_post; | |
329 } | |
330 | |
331 if (http_status_code != net::HTTP_OK) { | |
332 if (!status_callback.is_null()) | |
333 status_callback.Run(FAIL); | |
334 return; | |
335 } | |
336 | |
337 DVLOG(3) << "Received ReportResponse."; | |
338 ReportResponse response; | |
339 if (!response.ParseFromString(response_data)) { | |
340 LOG(ERROR) << "Invalid ReportResponse"; | |
341 if (!status_callback.is_null()) | |
342 status_callback.Run(FAIL); | |
343 return; | |
344 } | |
345 | |
346 if (ReportErrorLogged(response)) { | |
347 if (!status_callback.is_null()) | |
348 status_callback.Run(FAIL); | |
349 return; | |
350 } | |
351 | |
352 const RepeatedPtrField<MessageResult>& message_results = | |
353 response.manage_messages_response().published_message_result(); | |
354 for (int i = 0; i < message_results.size(); ++i) { | |
355 DVLOG(2) << "Published message with id " | |
356 << message_results.Get(i).published_message_id(); | |
357 } | |
358 | |
359 const RepeatedPtrField<SubscriptionResult>& subscription_results = | |
360 response.manage_subscriptions_response().subscription_result(); | |
361 for (int i = 0; i < subscription_results.size(); ++i) { | |
362 DVLOG(2) << "Created subscription with id " | |
363 << subscription_results.Get(i).subscription_id(); | |
364 } | |
365 | |
366 if (response.has_update_signals_response()) { | |
367 const UpdateSignalsResponse& update_response = | |
368 response.update_signals_response(); | |
369 DispatchMessages(update_response.message()); | |
370 | |
371 if (directive_handler_.get()) { | |
372 for (int i = 0; i < update_response.directive_size(); ++i) | |
373 directive_handler_->AddDirective(update_response.directive(i)); | |
374 } else { | |
375 DVLOG(1) << "No directive handler."; | |
376 } | |
377 | |
378 const RepeatedPtrField<Token>& tokens = update_response.token(); | |
379 for (int i = 0; i < tokens.size(); ++i) { | |
380 switch (tokens.Get(i).status()) { | |
381 case VALID: | |
382 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a | |
383 // short TTL (like 10s) and send it up with every report request. | |
384 // Then we'll still get messages while we're waiting to hear it again. | |
385 VLOG(1) << "Got valid token " << tokens.Get(i).id(); | |
386 break; | |
387 case INVALID: | |
388 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); | |
389 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); | |
390 break; | |
391 default: | |
392 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " | |
393 << tokens.Get(i).status(); | |
394 } | |
395 } | |
396 } | |
397 | |
398 // TODO(ckehoe): Return a more detailed status response. | |
399 if (!status_callback.is_null()) | |
400 status_callback.Run(SUCCESS); | |
401 } | |
402 | |
403 void RpcHandler::DispatchMessages( | |
404 const RepeatedPtrField<SubscribedMessage>& messages) { | |
405 if (messages.size() == 0) | |
406 return; | |
407 | |
408 // Index the messages by subscription id. | |
409 std::map<std::string, std::vector<Message> > messages_by_subscription; | |
410 DVLOG(3) << "Dispatching " << messages.size() << " messages"; | |
411 for (int m = 0; m < messages.size(); ++m) { | |
412 const RepeatedPtrField<std::string>& subscription_ids = | |
413 messages.Get(m).subscription_id(); | |
414 for (int s = 0; s < subscription_ids.size(); ++s) { | |
415 messages_by_subscription[subscription_ids.Get(s)].push_back( | |
416 messages.Get(m).published_message()); | |
417 } | |
418 } | |
419 | |
420 // Send the messages for each subscription. | |
421 for (std::map<std::string, std::vector<Message> >::const_iterator | |
422 subscription = messages_by_subscription.begin(); | |
423 subscription != messages_by_subscription.end(); | |
424 ++subscription) { | |
425 // TODO(ckehoe): Once we have the app ID from the server, we need to pass | |
426 // it in here and get rid of the app id registry from the main API class. | |
427 delegate_->HandleMessages("", subscription->first, subscription->second); | |
428 } | |
429 } | |
430 | |
431 RequestHeader* RpcHandler::CreateRequestHeader( | |
432 const std::string& client_name) const { | |
433 RequestHeader* header = new RequestHeader; | |
434 | |
435 header->set_allocated_framework_version( | |
436 CreateVersion("Chrome", delegate_->GetPlatformVersionString())); | |
437 if (!client_name.empty()) { | |
438 header->set_allocated_client_version( | |
439 CreateVersion(client_name, std::string())); | |
440 } | |
441 header->set_current_time_millis(base::Time::Now().ToJsTime()); | |
442 header->set_registered_device_id(device_id_); | |
443 | |
444 return header; | |
445 } | |
446 | |
447 template <class T> | |
448 void RpcHandler::SendServerRequest( | |
449 const std::string& rpc_name, | |
28 const std::string& app_id, | 450 const std::string& app_id, |
29 const StatusCallback& status_callback) { | 451 scoped_ptr<T> request, |
30 } | 452 const HttpPost::ResponseCallback& response_handler) { |
31 | 453 request->set_allocated_header(CreateRequestHeader(app_id)); |
32 void RpcHandler::ReportTokens(copresence::TokenMedium medium, | 454 server_post_callback_.Run(delegate_->GetRequestContext(), |
33 const std::vector<std::string>& tokens) { | 455 rpc_name, |
34 } | 456 make_scoped_ptr<MessageLite>(request.release()), |
35 | 457 response_handler); |
36 void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { | 458 } |
37 } | 459 |
38 | 460 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter, |
39 void RpcHandler::DisconnectFromWhispernet() { | 461 const std::string& rpc_name, |
462 scoped_ptr<MessageLite> request_proto, | |
463 const HttpPost::ResponseCallback& callback) { | |
464 // Create the base URL to call. | |
465 CommandLine* command_line = CommandLine::ForCurrentProcess(); | |
466 const std::string copresence_server_host = | |
467 command_line->HasSwitch(switches::kCopresenceServer) ? | |
468 command_line->GetSwitchValueASCII(switches::kCopresenceServer) : | |
469 kDefaultCopresenceServer; | |
470 | |
471 // Create the request and keep a pointer until it completes. | |
472 pending_posts_.insert(new HttpPost(url_context_getter, | |
473 copresence_server_host, | |
474 rpc_name, | |
475 *request_proto, | |
476 callback)); | |
477 } | |
478 | |
479 void RpcHandler::AudioDirectiveListToWhispernetConnector( | |
480 const std::string& token, | |
481 const WhispernetClient::SamplesCallback& samples_callback) { | |
482 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); | |
483 if (whispernet_client) { | |
484 whispernet_client->RegisterSamplesCallback(samples_callback); | |
485 whispernet_client->EncodeToken(token); | |
486 } | |
40 } | 487 } |
41 | 488 |
42 } // namespace copresence | 489 } // namespace copresence |
OLD | NEW |