OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "components/copresence/rpc/rpc_handler.h" | 5 #include "components/copresence/rpc/rpc_handler.h" |
6 | 6 |
| 7 #include <map> |
| 8 |
7 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/command_line.h" |
| 11 #include "base/guid.h" |
| 12 #include "base/logging.h" |
| 13 #include "base/strings/string_util.h" |
| 14 #include "base/time/time.h" |
| 15 #include "components/copresence/copresence_switches.h" |
| 16 #include "components/copresence/handlers/directive_handler.h" |
| 17 #include "components/copresence/proto/codes.pb.h" |
8 #include "components/copresence/proto/data.pb.h" | 18 #include "components/copresence/proto/data.pb.h" |
9 #include "components/copresence/proto/rpcs.pb.h" | 19 #include "components/copresence/proto/rpcs.pb.h" |
10 #include "components/copresence/public/copresence_client_delegate.h" | 20 #include "components/copresence/public/copresence_client_delegate.h" |
11 #include "components/copresence/public/whispernet_client.h" | 21 #include "net/http/http_status_code.h" |
| 22 |
| 23 // TODO(ckehoe): Return error messages for bad requests. |
12 | 24 |
13 namespace copresence { | 25 namespace copresence { |
14 | 26 |
15 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, | 27 using google::protobuf::MessageLite; |
16 SuccessCallback init_done_callback) { | 28 using google::protobuf::RepeatedPtrField; |
17 } | 29 |
18 | 30 const char RpcHandler::kReportRequestRpcName[] = "report"; |
19 RpcHandler::~RpcHandler() { | 31 |
20 } | 32 namespace { |
21 | 33 |
22 void RpcHandler::SendReportRequest( | 34 // UrlSafe is defined as: |
23 scoped_ptr<copresence::ReportRequest> request) { | 35 // '/' represented by a '_' and '+' represented by a '-' |
24 } | 36 // TODO(rkc): Move this to the wrapper. |
25 | 37 std::string ToUrlSafe(std::string token) { |
26 void RpcHandler::SendReportRequest( | 38 base::ReplaceChars(token, "+", "-", &token); |
27 scoped_ptr<copresence::ReportRequest> request, | 39 base::ReplaceChars(token, "/", "_", &token); |
| 40 return token; |
| 41 } |
| 42 |
| 43 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. |
| 44 const int kMaxInvalidTokens = 10000; |
| 45 const char kRegisterDeviceRpcName[] = "registerdevice"; |
| 46 const char kDefaultCopresenceServer[] = |
| 47 "https://www.googleapis.com/copresence/v2/copresence"; |
| 48 |
| 49 // Logging |
| 50 |
| 51 // Checks for a copresence error. If there is one, logs it and returns true. |
| 52 bool CopresenceErrorLogged(const Status& status) { |
| 53 if (status.code() != OK) { |
| 54 LOG(ERROR) << "Copresence error code " << status.code() |
| 55 << (status.message().empty() ? std::string() : |
| 56 ": " + status.message()); |
| 57 } |
| 58 return status.code() != OK; |
| 59 } |
| 60 |
| 61 void LogIfErrorStatus(const util::error::Code& code, |
| 62 const std::string& context) { |
| 63 LOG_IF(ERROR, code != util::error::OK) |
| 64 << context << " error " << code << ". See " |
| 65 << "cs/google3/util/task/codes.proto for more info."; |
| 66 } |
| 67 |
| 68 // If any errors occurred, logs them and returns true. |
| 69 bool ReportErrorLogged(const ReportResponse& response) { |
| 70 bool result = CopresenceErrorLogged(response.header().status()); |
| 71 |
| 72 // The Report fails or succeeds as a unit. If any responses had errors, |
| 73 // the header will too. Thus we don't need to propagate individual errors. |
| 74 if (response.has_update_signals_response()) |
| 75 LogIfErrorStatus(response.update_signals_response().status(), "Update"); |
| 76 if (response.has_manage_messages_response()) |
| 77 LogIfErrorStatus(response.manage_messages_response().status(), "Publish"); |
| 78 if (response.has_manage_subscriptions_response()) { |
| 79 LogIfErrorStatus(response.manage_subscriptions_response().status(), |
| 80 "Subscribe"); |
| 81 } |
| 82 |
| 83 return result; |
| 84 } |
| 85 |
| 86 // Request construction |
| 87 |
| 88 template <typename T> |
| 89 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) { |
| 90 if (msg.has_token_exchange_strategy() && |
| 91 msg.token_exchange_strategy().has_broadcast_scan_configuration()) { |
| 92 return msg.token_exchange_strategy().broadcast_scan_configuration(); |
| 93 } |
| 94 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN; |
| 95 } |
| 96 |
| 97 // This method will extract token exchange strategies |
| 98 // from the publishes and subscribes in a report request. |
| 99 // TODO(ckehoe): Delete this when the server supports |
| 100 // BroadcastScanConfiguration. |
| 101 BroadcastScanConfiguration ExtractTokenExchangeStrategy( |
| 102 const ReportRequest& request) { |
| 103 bool broadcast_only = false; |
| 104 bool scan_only = false; |
| 105 |
| 106 // Strategies for publishes. |
| 107 if (request.has_manage_messages_request()) { |
| 108 const RepeatedPtrField<PublishedMessage> messages = |
| 109 request.manage_messages_request().message_to_publish(); |
| 110 for (int i = 0; i < messages.size(); ++i) { |
| 111 BroadcastScanConfiguration config = |
| 112 GetBroadcastScanConfig(messages.Get(i)); |
| 113 broadcast_only = broadcast_only || config == BROADCAST_ONLY; |
| 114 scan_only = scan_only || config == SCAN_ONLY; |
| 115 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) |
| 116 return BROADCAST_AND_SCAN; |
| 117 } |
| 118 } |
| 119 |
| 120 // Strategies for subscriptions. |
| 121 if (request.has_manage_subscriptions_request()) { |
| 122 const RepeatedPtrField<Subscription> messages = |
| 123 request.manage_subscriptions_request().subscription(); |
| 124 for (int i = 0; i < messages.size(); ++i) { |
| 125 BroadcastScanConfiguration config = |
| 126 GetBroadcastScanConfig(messages.Get(i)); |
| 127 broadcast_only = broadcast_only || config == BROADCAST_ONLY; |
| 128 scan_only = scan_only || config == SCAN_ONLY; |
| 129 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) |
| 130 return BROADCAST_AND_SCAN; |
| 131 } |
| 132 } |
| 133 |
| 134 if (broadcast_only) |
| 135 return BROADCAST_ONLY; |
| 136 if (scan_only) |
| 137 return SCAN_ONLY; |
| 138 |
| 139 // If nothing else is specified, default to both broadcast and scan. |
| 140 return BROADCAST_AND_SCAN; |
| 141 } |
| 142 |
| 143 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) { |
| 144 scoped_ptr<DeviceState> state(new DeviceState); |
| 145 |
| 146 TokenTechnology* token_technology = |
| 147 state->mutable_capabilities()->add_token_technology(); |
| 148 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND); |
| 149 |
| 150 BroadcastScanConfiguration config = |
| 151 ExtractTokenExchangeStrategy(request); |
| 152 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN) |
| 153 token_technology->add_instruction_type(TRANSMIT); |
| 154 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN) |
| 155 token_technology->add_instruction_type(RECEIVE); |
| 156 |
| 157 return state.Pass(); |
| 158 } |
| 159 |
| 160 // TODO(ckehoe): We're keeping this code in a separate function for now |
| 161 // because we get a version string from Chrome, but the proto expects |
| 162 // an int64 version. We should probably change the version proto |
| 163 // to handle a more detailed version. |
| 164 ClientVersion* CreateVersion(const std::string& client, |
| 165 const std::string& version_name) { |
| 166 ClientVersion* version = new ClientVersion; |
| 167 |
| 168 version->set_client(client); |
| 169 version->set_version_name(version_name); |
| 170 |
| 171 return version; |
| 172 } |
| 173 |
| 174 } // namespace |
| 175 |
| 176 // Public methods |
| 177 |
| 178 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate) |
| 179 : delegate_(delegate), |
| 180 invalid_audio_token_cache_( |
| 181 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), |
| 182 kMaxInvalidTokens), |
| 183 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, |
| 184 base::Unretained(this))) {} |
| 185 |
| 186 RpcHandler::~RpcHandler() {} |
| 187 |
| 188 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { |
| 189 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); |
| 190 DCHECK(device_id_.empty()); |
| 191 device_id_ = delegate_->GetDeviceId(); |
| 192 if (!device_id_.empty()) { |
| 193 init_done_callback.Run(true); |
| 194 return; |
| 195 } |
| 196 |
| 197 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); |
| 198 Identity* identity = |
| 199 request->mutable_device_identifiers()->mutable_registrant(); |
| 200 identity->set_type(CHROME); |
| 201 identity->set_chrome_id(base::GenerateGUID()); |
| 202 SendServerRequest( |
| 203 kRegisterDeviceRpcName, |
| 204 std::string(), |
| 205 request.Pass(), |
| 206 base::Bind(&RpcHandler::RegisterResponseHandler, |
| 207 // On Shutdown, this request will be cancelled. |
| 208 base::Unretained(this), |
| 209 init_done_callback)); |
| 210 } |
| 211 |
| 212 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) { |
| 213 SendReportRequest(request.Pass(), std::string(), StatusCallback()); |
| 214 } |
| 215 |
| 216 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, |
| 217 const std::string& app_id, |
| 218 const StatusCallback& status_callback) { |
| 219 DCHECK(request.get()); |
| 220 DCHECK(!device_id_.empty()) |
| 221 << "RpcHandler::Initialize() must complete successfully " |
| 222 << "before other RpcHandler methods are called."; |
| 223 |
| 224 DVLOG(3) << "Sending report request to server."; |
| 225 |
| 226 request->mutable_update_signals_request()->set_allocated_state( |
| 227 GetDeviceCapabilities(*request).release()); |
| 228 SendServerRequest(kReportRequestRpcName, |
| 229 app_id, |
| 230 request.Pass(), |
| 231 base::Bind(&RpcHandler::ReportResponseHandler, |
| 232 // On Shutdown, this request will be cancelled. |
| 233 base::Unretained(this), |
| 234 status_callback)); |
| 235 } |
| 236 |
| 237 void RpcHandler::ReportTokens(TokenMedium medium, |
| 238 const std::vector<std::string>& tokens) { |
| 239 DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND); |
| 240 DCHECK(!tokens.empty()); |
| 241 |
| 242 scoped_ptr<ReportRequest> request(new ReportRequest); |
| 243 for (size_t i = 0; i < tokens.size(); ++i) { |
| 244 const std::string& token = ToUrlSafe(tokens[i]); |
| 245 if (invalid_audio_token_cache_.HasKey(token)) |
| 246 continue; |
| 247 |
| 248 DVLOG(3) << "Sending token " << token << " to server."; |
| 249 |
| 250 TokenObservation* token_observation = |
| 251 request->mutable_update_signals_request()->add_token_observation(); |
| 252 token_observation->set_token_id(token); |
| 253 |
| 254 TokenSignals* signals = token_observation->add_signals(); |
| 255 signals->set_medium(medium); |
| 256 signals->set_observed_time_millis(base::Time::Now().ToJsTime()); |
| 257 } |
| 258 SendReportRequest(request.Pass()); |
| 259 } |
| 260 |
| 261 void RpcHandler::ConnectToWhispernet() { |
| 262 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
| 263 |
| 264 // |directive_handler_| will be destructed on Shutdown, so unretained is safe. |
| 265 directive_handler_.reset(new DirectiveHandler); |
| 266 directive_handler_->Initialize( |
| 267 base::Bind(&WhispernetClient::DecodeSamples, |
| 268 base::Unretained(whispernet_client)), |
| 269 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, |
| 270 base::Unretained(this))); |
| 271 |
| 272 whispernet_client->RegisterTokensCallback( |
| 273 base::Bind(&RpcHandler::ReportTokens, |
| 274 // On Shutdown, this callback will be disconnected. |
| 275 base::Unretained(this), |
| 276 AUDIO_ULTRASOUND_PASSBAND)); |
| 277 } |
| 278 |
| 279 void RpcHandler::Shutdown() { |
| 280 for (std::set<HttpPost*>::iterator post = pending_posts_.begin(); |
| 281 post != pending_posts_.end(); ++post) { |
| 282 (*post)->Cancel(); |
| 283 } |
| 284 |
| 285 delegate_->GetWhispernetClient()->RegisterTokensCallback( |
| 286 WhispernetClient::TokensCallback()); |
| 287 directive_handler_.reset(); |
| 288 } |
| 289 |
| 290 // Private methods |
| 291 |
| 292 void RpcHandler::RegisterResponseHandler( |
| 293 const SuccessCallback& init_done_callback, |
| 294 int http_status_code, |
| 295 const std::string& response_data, |
| 296 HttpPost* completed_post) { |
| 297 if (completed_post) |
| 298 pending_posts_.erase(completed_post); |
| 299 |
| 300 if (http_status_code != net::HTTP_OK) { |
| 301 init_done_callback.Run(false); |
| 302 return; |
| 303 } |
| 304 |
| 305 RegisterDeviceResponse response; |
| 306 if (!response.ParseFromString(response_data)) { |
| 307 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; |
| 308 init_done_callback.Run(false); |
| 309 return; |
| 310 } |
| 311 |
| 312 if (CopresenceErrorLogged(response.header().status())) |
| 313 return; |
| 314 device_id_ = response.registered_device_id(); |
| 315 DCHECK(!device_id_.empty()); |
| 316 DVLOG(2) << "Device registration successful: id " << device_id_; |
| 317 delegate_->SaveDeviceId(device_id_); |
| 318 init_done_callback.Run(true); |
| 319 } |
| 320 |
| 321 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, |
| 322 int http_status_code, |
| 323 const std::string& response_data, |
| 324 HttpPost* completed_post) { |
| 325 if (completed_post) |
| 326 pending_posts_.erase(completed_post); |
| 327 |
| 328 if (http_status_code != net::HTTP_OK) { |
| 329 if (!status_callback.is_null()) |
| 330 status_callback.Run(FAIL); |
| 331 return; |
| 332 } |
| 333 |
| 334 DVLOG(3) << "Received ReportResponse."; |
| 335 ReportResponse response; |
| 336 if (!response.ParseFromString(response_data)) { |
| 337 LOG(ERROR) << "Invalid ReportResponse"; |
| 338 if (!status_callback.is_null()) |
| 339 status_callback.Run(FAIL); |
| 340 return; |
| 341 } |
| 342 |
| 343 if (ReportErrorLogged(response)) { |
| 344 if (!status_callback.is_null()) |
| 345 status_callback.Run(FAIL); |
| 346 return; |
| 347 } |
| 348 |
| 349 const RepeatedPtrField<MessageResult>& message_results = |
| 350 response.manage_messages_response().published_message_result(); |
| 351 for (int i = 0; i < message_results.size(); ++i) { |
| 352 DVLOG(2) << "Published message with id " |
| 353 << message_results.Get(i).published_message_id(); |
| 354 } |
| 355 |
| 356 const RepeatedPtrField<SubscriptionResult>& subscription_results = |
| 357 response.manage_subscriptions_response().subscription_result(); |
| 358 for (int i = 0; i < subscription_results.size(); ++i) { |
| 359 DVLOG(2) << "Created subscription with id " |
| 360 << subscription_results.Get(i).subscription_id(); |
| 361 } |
| 362 |
| 363 if (response.has_update_signals_response()) { |
| 364 const UpdateSignalsResponse& update_response = |
| 365 response.update_signals_response(); |
| 366 DispatchMessages(update_response.message()); |
| 367 |
| 368 if (directive_handler_.get()) { |
| 369 for (int i = 0; i < update_response.directive_size(); ++i) |
| 370 directive_handler_->AddDirective(update_response.directive(i)); |
| 371 } else { |
| 372 DVLOG(1) << "No directive handler."; |
| 373 } |
| 374 |
| 375 const RepeatedPtrField<Token>& tokens = update_response.token(); |
| 376 for (int i = 0; i < tokens.size(); ++i) { |
| 377 switch (tokens.Get(i).status()) { |
| 378 case VALID: |
| 379 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a |
| 380 // short TTL (like 10s) and send it up with every report request. |
| 381 // Then we'll still get messages while we're waiting to hear it again. |
| 382 VLOG(1) << "Got valid token " << tokens.Get(i).id(); |
| 383 break; |
| 384 case INVALID: |
| 385 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); |
| 386 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); |
| 387 break; |
| 388 default: |
| 389 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " |
| 390 << tokens.Get(i).status(); |
| 391 } |
| 392 } |
| 393 } |
| 394 |
| 395 // TODO(ckehoe): Return a more detailed status response. |
| 396 if (!status_callback.is_null()) |
| 397 status_callback.Run(SUCCESS); |
| 398 } |
| 399 |
| 400 void RpcHandler::DispatchMessages( |
| 401 const RepeatedPtrField<SubscribedMessage>& messages) { |
| 402 if (messages.size() == 0) |
| 403 return; |
| 404 |
| 405 // Index the messages by subscription id. |
| 406 std::map<std::string, std::vector<Message> > messages_by_subscription; |
| 407 DVLOG(3) << "Dispatching " << messages.size() << " messages"; |
| 408 for (int m = 0; m < messages.size(); ++m) { |
| 409 const RepeatedPtrField<std::string>& subscription_ids = |
| 410 messages.Get(m).subscription_id(); |
| 411 for (int s = 0; s < subscription_ids.size(); ++s) { |
| 412 messages_by_subscription[subscription_ids.Get(s)].push_back( |
| 413 messages.Get(m).published_message()); |
| 414 } |
| 415 } |
| 416 |
| 417 // Send the messages for each subscription. |
| 418 for (std::map<std::string, std::vector<Message> >::const_iterator |
| 419 subscription = messages_by_subscription.begin(); |
| 420 subscription != messages_by_subscription.end(); |
| 421 ++subscription) { |
| 422 // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
| 423 // it in here and get rid of the app id registry from the main API class. |
| 424 delegate_->HandleMessages("", subscription->first, subscription->second); |
| 425 } |
| 426 } |
| 427 |
| 428 RequestHeader* RpcHandler::CreateRequestHeader( |
| 429 const std::string& client_name) const { |
| 430 RequestHeader* header = new RequestHeader; |
| 431 |
| 432 header->set_allocated_framework_version( |
| 433 CreateVersion("Chrome", delegate_->GetPlatformVersionString())); |
| 434 if (!client_name.empty()) { |
| 435 header->set_allocated_client_version( |
| 436 CreateVersion(client_name, std::string())); |
| 437 } |
| 438 header->set_current_time_millis(base::Time::Now().ToJsTime()); |
| 439 header->set_registered_device_id(device_id_); |
| 440 |
| 441 return header; |
| 442 } |
| 443 |
| 444 template <class T> |
| 445 void RpcHandler::SendServerRequest( |
| 446 const std::string& rpc_name, |
28 const std::string& app_id, | 447 const std::string& app_id, |
29 const StatusCallback& status_callback) { | 448 scoped_ptr<T> request, |
30 } | 449 const HttpPost::ResponseCallback& response_handler) { |
31 | 450 request->set_allocated_header(CreateRequestHeader(app_id)); |
32 void RpcHandler::ReportTokens(copresence::TokenMedium medium, | 451 server_post_callback_.Run(delegate_->GetRequestContext(), |
33 const std::vector<std::string>& tokens) { | 452 rpc_name, |
34 } | 453 make_scoped_ptr<MessageLite>(request.release()), |
35 | 454 response_handler); |
36 void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { | 455 } |
37 } | 456 |
38 | 457 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter, |
39 void RpcHandler::DisconnectFromWhispernet() { | 458 const std::string& rpc_name, |
| 459 scoped_ptr<MessageLite> request_proto, |
| 460 const HttpPost::ResponseCallback& callback) { |
| 461 // Create the base URL to call. |
| 462 CommandLine* command_line = CommandLine::ForCurrentProcess(); |
| 463 const std::string copresence_server_host = |
| 464 command_line->HasSwitch(switches::kCopresenceServer) ? |
| 465 command_line->GetSwitchValueASCII(switches::kCopresenceServer) : |
| 466 kDefaultCopresenceServer; |
| 467 |
| 468 // Create the request and keep a pointer until it completes. |
| 469 pending_posts_.insert(new HttpPost(url_context_getter, |
| 470 copresence_server_host, |
| 471 rpc_name, |
| 472 *request_proto, |
| 473 callback)); |
| 474 } |
| 475 |
| 476 void RpcHandler::AudioDirectiveListToWhispernetConnector( |
| 477 const std::string& token, |
| 478 const WhispernetClient::SamplesCallback& samples_callback) { |
| 479 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
| 480 if (whispernet_client) { |
| 481 whispernet_client->RegisterSamplesCallback(samples_callback); |
| 482 whispernet_client->EncodeToken(token); |
| 483 } |
40 } | 484 } |
41 | 485 |
42 } // namespace copresence | 486 } // namespace copresence |
OLD | NEW |