Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "components/copresence/rpc/rpc_handler.h" | 5 #include "components/copresence/rpc/rpc_handler.h" |
| 6 | 6 |
| 7 #include <map> | 7 #include <map> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/command_line.h" | 10 #include "base/command_line.h" |
| (...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 152 | 152 |
| 153 RpcHandler::RpcHandler(CopresenceDelegate* delegate) | 153 RpcHandler::RpcHandler(CopresenceDelegate* delegate) |
| 154 : delegate_(delegate), | 154 : delegate_(delegate), |
| 155 invalid_audio_token_cache_( | 155 invalid_audio_token_cache_( |
| 156 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), | 156 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), |
| 157 kMaxInvalidTokens), | 157 kMaxInvalidTokens), |
| 158 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, | 158 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, |
| 159 base::Unretained(this))) {} | 159 base::Unretained(this))) {} |
| 160 | 160 |
| 161 RpcHandler::~RpcHandler() { | 161 RpcHandler::~RpcHandler() { |
| 162 for (std::set<HttpPost*>::iterator post = pending_posts_.begin(); | 162 for (HttpPost* post : pending_posts_) { |
| 163 post != pending_posts_.end(); ++post) { | 163 delete post; |
| 164 delete *post; | |
| 165 } | 164 } |
| 166 | 165 |
| 167 if (delegate_ && delegate_->GetWhispernetClient()) { | 166 if (delegate_ && delegate_->GetWhispernetClient()) { |
| 168 delegate_->GetWhispernetClient()->RegisterTokensCallback( | 167 delegate_->GetWhispernetClient()->RegisterTokensCallback( |
| 169 WhispernetClient::TokensCallback()); | 168 WhispernetClient::TokensCallback()); |
| 170 } | 169 } |
| 171 } | 170 } |
| 172 | 171 |
| 173 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { | 172 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { |
| 174 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); | 173 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 218 // On destruction, this request will be cancelled. | 217 // On destruction, this request will be cancelled. |
| 219 base::Bind(&RpcHandler::ReportResponseHandler, | 218 base::Bind(&RpcHandler::ReportResponseHandler, |
| 220 base::Unretained(this), | 219 base::Unretained(this), |
| 221 status_callback)); | 220 status_callback)); |
| 222 } | 221 } |
| 223 | 222 |
| 224 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) { | 223 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) { |
| 225 DCHECK(!tokens.empty()); | 224 DCHECK(!tokens.empty()); |
| 226 | 225 |
| 227 scoped_ptr<ReportRequest> request(new ReportRequest); | 226 scoped_ptr<ReportRequest> request(new ReportRequest); |
| 228 for (size_t i = 0; i < tokens.size(); ++i) { | 227 for (const AudioToken& token : tokens) { |
| 229 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token))) | 228 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token))) |
| 230 continue; | 229 continue; |
| 231 DVLOG(3) << "Sending token " << tokens[i].token << " to server."; | 230 DVLOG(3) << "Sending token " << token.token << " to server."; |
| 232 AddTokenToRequest(request.get(), tokens[i]); | 231 AddTokenToRequest(request.get(), token); |
| 233 } | 232 } |
| 234 SendReportRequest(request.Pass()); | 233 SendReportRequest(request.Pass()); |
| 235 } | 234 } |
| 236 | 235 |
| 237 void RpcHandler::ConnectToWhispernet() { | 236 void RpcHandler::ConnectToWhispernet() { |
| 238 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); | 237 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
| 239 | 238 |
| 240 // |directive_handler_| will be destructed with us, so unretained is safe. | 239 // |directive_handler_| will be destructed with us, so unretained is safe. |
| 241 directive_handler_.reset(new DirectiveHandler); | 240 directive_handler_.reset(new DirectiveHandler); |
| 242 directive_handler_->Initialize( | 241 directive_handler_->Initialize( |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 308 status_callback.Run(FAIL); | 307 status_callback.Run(FAIL); |
| 309 return; | 308 return; |
| 310 } | 309 } |
| 311 | 310 |
| 312 if (ReportErrorLogged(response)) { | 311 if (ReportErrorLogged(response)) { |
| 313 if (!status_callback.is_null()) | 312 if (!status_callback.is_null()) |
| 314 status_callback.Run(FAIL); | 313 status_callback.Run(FAIL); |
| 315 return; | 314 return; |
| 316 } | 315 } |
| 317 | 316 |
| 318 const RepeatedPtrField<MessageResult>& message_results = | 317 for (const MessageResult& result : |
| 319 response.manage_messages_response().published_message_result(); | 318 response.manage_messages_response().published_message_result()) { |
| 320 for (int i = 0; i < message_results.size(); ++i) { | 319 DVLOG(2) << "Published message with id " << result.published_message_id(); |
| 321 DVLOG(2) << "Published message with id " | |
| 322 << message_results.Get(i).published_message_id(); | |
| 323 } | 320 } |
| 324 | 321 |
| 325 const RepeatedPtrField<SubscriptionResult>& subscription_results = | 322 for (const SubscriptionResult& result : |
| 326 response.manage_subscriptions_response().subscription_result(); | 323 response.manage_subscriptions_response().subscription_result()) { |
| 327 for (int i = 0; i < subscription_results.size(); ++i) { | 324 DVLOG(2) << "Created subscription with id " << result.subscription_id(); |
| 328 DVLOG(2) << "Created subscription with id " | |
| 329 << subscription_results.Get(i).subscription_id(); | |
| 330 } | 325 } |
| 331 | 326 |
| 332 if (response.has_update_signals_response()) { | 327 if (response.has_update_signals_response()) { |
| 333 const UpdateSignalsResponse& update_response = | 328 const UpdateSignalsResponse& update_response = |
| 334 response.update_signals_response(); | 329 response.update_signals_response(); |
| 335 DispatchMessages(update_response.message()); | 330 DispatchMessages(update_response.message()); |
| 336 | 331 |
| 337 if (directive_handler_.get()) { | 332 if (directive_handler_.get()) { |
| 338 for (int i = 0; i < update_response.directive_size(); ++i) | 333 for (const Directive& directive : update_response.directive()) |
| 339 directive_handler_->AddDirective(update_response.directive(i)); | 334 directive_handler_->AddDirective(directive); |
| 340 } else { | 335 } else { |
| 341 DVLOG(1) << "No directive handler."; | 336 DVLOG(1) << "No directive handler."; |
| 342 } | 337 } |
| 343 | 338 |
| 344 const RepeatedPtrField<Token>& tokens = update_response.token(); | 339 for (const Token& token : update_response.token()) { |
| 345 for (int i = 0; i < tokens.size(); ++i) { | 340 switch (token.status()) { |
| 346 switch (tokens.Get(i).status()) { | |
| 347 case VALID: | 341 case VALID: |
| 348 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a | 342 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a |
| 349 // short TTL (like 10s) and send it up with every report request. | 343 // short TTL (like 10s) and send it up with every report request. |
| 350 // Then we'll still get messages while we're waiting to hear it again. | 344 // Then we'll still get messages while we're waiting to hear it again. |
| 351 VLOG(1) << "Got valid token " << tokens.Get(i).id(); | 345 VLOG(1) << "Got valid token " << token.id(); |
| 352 break; | 346 break; |
| 353 case INVALID: | 347 case INVALID: |
| 354 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); | 348 DVLOG(3) << "Discarding invalid token " << token.id(); |
| 355 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); | 349 invalid_audio_token_cache_.Add(token.id(), true); |
| 356 break; | 350 break; |
| 357 default: | 351 default: |
| 358 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " | 352 DVLOG(2) << "Token " << token.id() << " has status code " |
| 359 << tokens.Get(i).status(); | 353 << token.status(); |
| 360 } | 354 } |
| 361 } | 355 } |
| 362 } | 356 } |
| 363 | 357 |
| 364 // TODO(ckehoe): Return a more detailed status response. | 358 // TODO(ckehoe): Return a more detailed status response. |
| 365 if (!status_callback.is_null()) | 359 if (!status_callback.is_null()) |
| 366 status_callback.Run(SUCCESS); | 360 status_callback.Run(SUCCESS); |
| 367 } | 361 } |
| 368 | 362 |
| 369 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) { | 363 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) { |
| 370 // Remove unpublishes. | 364 // Remove unpublishes. |
| 371 if (request.has_manage_messages_request()) { | 365 if (request.has_manage_messages_request()) { |
| 372 const RepeatedPtrField<std::string>& unpublishes = | 366 for (const std::string& unpublish : |
| 373 request.manage_messages_request().id_to_unpublish(); | 367 request.manage_messages_request().id_to_unpublish()) { |
| 374 for (int i = 0; i < unpublishes.size(); ++i) | 368 directive_handler_->RemoveDirectives(unpublish); |
| 375 directive_handler_->RemoveDirectives(unpublishes.Get(i)); | 369 } |
| 376 } | 370 } |
| 377 | 371 |
| 378 // Remove unsubscribes. | 372 // Remove unsubscribes. |
| 379 if (request.has_manage_subscriptions_request()) { | 373 if (request.has_manage_subscriptions_request()) { |
| 380 const RepeatedPtrField<std::string>& unsubscribes = | 374 for (const std::string& unsubscribe : |
| 381 request.manage_subscriptions_request().id_to_unsubscribe(); | 375 request.manage_subscriptions_request().id_to_unsubscribe()) { |
| 382 for (int i = 0; i < unsubscribes.size(); ++i) | 376 directive_handler_->RemoveDirectives(unsubscribe); |
| 383 directive_handler_->RemoveDirectives(unsubscribes.Get(i)); | 377 } |
| 384 } | 378 } |
| 385 } | 379 } |
| 386 | 380 |
| 387 void RpcHandler::AddPlayingTokens(ReportRequest* request) { | 381 void RpcHandler::AddPlayingTokens(ReportRequest* request) { |
| 388 if (!directive_handler_) | 382 if (!directive_handler_) |
| 389 return; | 383 return; |
| 390 | 384 |
| 391 const std::string& audible_token = directive_handler_->CurrentAudibleToken(); | 385 const std::string& audible_token = directive_handler_->CurrentAudibleToken(); |
| 392 const std::string& inaudible_token = | 386 const std::string& inaudible_token = |
| 393 directive_handler_->CurrentInaudibleToken(); | 387 directive_handler_->CurrentInaudibleToken(); |
| 394 | 388 |
| 395 if (!audible_token.empty()) | 389 if (!audible_token.empty()) |
| 396 AddTokenToRequest(request, AudioToken(audible_token, true)); | 390 AddTokenToRequest(request, AudioToken(audible_token, true)); |
| 397 if (!inaudible_token.empty()) | 391 if (!inaudible_token.empty()) |
| 398 AddTokenToRequest(request, AudioToken(inaudible_token, false)); | 392 AddTokenToRequest(request, AudioToken(inaudible_token, false)); |
| 399 } | 393 } |
| 400 | 394 |
| 401 void RpcHandler::DispatchMessages( | 395 void RpcHandler::DispatchMessages( |
| 402 const RepeatedPtrField<SubscribedMessage>& messages) { | 396 const RepeatedPtrField<SubscribedMessage>& messages) { |
| 403 if (messages.size() == 0) | 397 if (messages.size() == 0) |
| 404 return; | 398 return; |
| 405 | 399 |
| 406 // Index the messages by subscription id. | 400 // Index the messages by subscription id. |
| 407 std::map<std::string, std::vector<Message>> messages_by_subscription; | 401 std::map<std::string, std::vector<Message>> messages_by_subscription; |
| 408 DVLOG(3) << "Dispatching " << messages.size() << " messages"; | 402 DVLOG(3) << "Dispatching " << messages.size() << " messages"; |
| 409 for (int m = 0; m < messages.size(); ++m) { | 403 for (const SubscribedMessage& message : messages) { |
| 410 const RepeatedPtrField<std::string>& subscription_ids = | 404 for (const std::string& subscription_id : message.subscription_id()) { |
| 411 messages.Get(m).subscription_id(); | 405 messages_by_subscription[subscription_id].push_back( |
| 412 for (int s = 0; s < subscription_ids.size(); ++s) { | 406 message.published_message()); |
| 413 messages_by_subscription[subscription_ids.Get(s)].push_back( | |
| 414 messages.Get(m).published_message()); | |
| 415 } | 407 } |
| 416 } | 408 } |
| 417 | 409 |
| 418 // Send the messages for each subscription. | 410 // Send the messages for each subscription. |
| 419 for (std::map<std::string, std::vector<Message>>::const_iterator | 411 for (const std::pair<std::string, std::vector<Message>>& map_entry : |
|
rkc
2014/10/01 03:48:32
const auto&
Charlie
2014/10/06 19:09:56
At first I thought this was useful documentation.
| |
| 420 subscription = messages_by_subscription.begin(); | 412 messages_by_subscription) { |
| 421 subscription != messages_by_subscription.end(); | |
| 422 ++subscription) { | |
| 423 // TODO(ckehoe): Once we have the app ID from the server, we need to pass | 413 // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
| 424 // it in here and get rid of the app id registry from the main API class. | 414 // it in here and get rid of the app id registry from the main API class. |
| 425 delegate_->HandleMessages("", subscription->first, subscription->second); | 415 const std::string& subscription = map_entry.first; |
| 416 const std::vector<Message>& messages = map_entry.second; | |
| 417 delegate_->HandleMessages("", subscription, messages); | |
| 426 } | 418 } |
| 427 } | 419 } |
| 428 | 420 |
| 429 RequestHeader* RpcHandler::CreateRequestHeader( | 421 RequestHeader* RpcHandler::CreateRequestHeader( |
| 430 const std::string& client_name) const { | 422 const std::string& client_name) const { |
| 431 RequestHeader* header = new RequestHeader; | 423 RequestHeader* header = new RequestHeader; |
| 432 | 424 |
| 433 header->set_allocated_framework_version(CreateVersion( | 425 header->set_allocated_framework_version(CreateVersion( |
| 434 "Chrome", delegate_->GetPlatformVersionString())); | 426 "Chrome", delegate_->GetPlatformVersionString())); |
| 435 if (!client_name.empty()) { | 427 if (!client_name.empty()) { |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 489 bool audible, | 481 bool audible, |
| 490 const WhispernetClient::SamplesCallback& samples_callback) { | 482 const WhispernetClient::SamplesCallback& samples_callback) { |
| 491 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); | 483 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
| 492 if (whispernet_client) { | 484 if (whispernet_client) { |
| 493 whispernet_client->RegisterSamplesCallback(samples_callback); | 485 whispernet_client->RegisterSamplesCallback(samples_callback); |
| 494 whispernet_client->EncodeToken(token, audible); | 486 whispernet_client->EncodeToken(token, audible); |
| 495 } | 487 } |
| 496 } | 488 } |
| 497 | 489 |
| 498 } // namespace copresence | 490 } // namespace copresence |
| OLD | NEW |