| Index: components/copresence/rpc/rpc_handler.cc
|
| diff --git a/components/copresence/rpc/rpc_handler.cc b/components/copresence/rpc/rpc_handler.cc
|
| index e5f12d0c4d969eaff957ac7b2aca360e232c0888..0b737c1ff107920428ecf902607699e154657aec 100644
|
| --- a/components/copresence/rpc/rpc_handler.cc
|
| +++ b/components/copresence/rpc/rpc_handler.cc
|
| @@ -1,42 +1,469 @@
|
| // Copyright 2014 The Chromium Authors. All rights reserved.
|
| -// Use of this source code is governed by a BSD-style license that can be
|
| -// found in the LICENSE file.
|
| +// Use of this source code is governed by a BSD-style license
|
| +// that can be found in the LICENSE file.
|
|
|
| #include "components/copresence/rpc/rpc_handler.h"
|
|
|
| +#include <map>
|
| +
|
| #include "base/bind.h"
|
| +#include "base/command_line.h"
|
| +#include "base/guid.h"
|
| +#include "base/logging.h"
|
| +#include "base/strings/string_util.h"
|
| +#include "base/time/time.h"
|
| +#include "components/copresence/copresence_switches.h"
|
| +#include "components/copresence/handlers/directive_handler.h"
|
| +#include "components/copresence/proto/codes.pb.h"
|
| #include "components/copresence/proto/data.pb.h"
|
| #include "components/copresence/proto/rpcs.pb.h"
|
| #include "components/copresence/public/copresence_client_delegate.h"
|
| -#include "components/copresence/public/whispernet_client.h"
|
| +#include "net/http/http_status_code.h"
|
| +
|
| +// TODO(ckehoe): Return error messages for bad requests.
|
|
|
| namespace copresence {
|
|
|
| -RpcHandler::RpcHandler(CopresenceClientDelegate* delegate,
|
| - SuccessCallback init_done_callback) {
|
| +using google::protobuf::MessageLite;
|
| +using google::protobuf::RepeatedPtrField;
|
| +
|
| +const char RpcHandler::kReportRequestRpcName[] = "report";
|
| +
|
| +namespace {
|
| +
|
| +// UrlSafe is defined as:
|
| +// '/' represented by a '_' and '+' represented by a '-'
|
| +// TODO(rkc): Move this to the wrapper.
|
| +std::string ToUrlSafe(std::string token) {
|
| + base::ReplaceChars(token, "+", "-", &token);
|
| + base::ReplaceChars(token, "/", "_", &token);
|
| + return token;
|
| }
|
|
|
| -RpcHandler::~RpcHandler() {
|
| +const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
|
| +const int kMaxInvalidTokens = 10000;
|
| +const char kRegisterDeviceRpcName[] = "registerdevice";
|
| +const char kDefaultCopresenceServer[] =
|
| + "https://www.googleapis.com/copresence/v2/copresence";
|
| +
|
| +// Logging
|
| +
|
| +// Checks for a copresence error. If there is one, logs it and returns true.
|
| +bool CopresenceErrorLogged(const Status& status) {
|
| + if (status.code() != OK) {
|
| + LOG(ERROR) << "Copresence error code " << status.code()
|
| + << (status.message().empty() ? std::string() :
|
| + ": " + status.message());
|
| + }
|
| + return status.code() != OK;
|
| }
|
|
|
| -void RpcHandler::SendReportRequest(
|
| - scoped_ptr<copresence::ReportRequest> request) {
|
| +void LogIfErrorStatus(const util::error::Code& code,
|
| + const std::string& context) {
|
| + LOG_IF(ERROR, code != util::error::OK)
|
| + << context << " error " << code << ". See "
|
| + << "cs/google3/util/task/codes.proto for more info.";
|
| }
|
|
|
| -void RpcHandler::SendReportRequest(
|
| - scoped_ptr<copresence::ReportRequest> request,
|
| - const std::string& app_id,
|
| - const StatusCallback& status_callback) {
|
| +// If any errors occurred, logs them and returns true.
|
| +bool ReportErrorLogged(const ReportResponse& response) {
|
| + bool result = CopresenceErrorLogged(response.header().status());
|
| +
|
| + // The Report fails or succeeds as a unit. If any responses had errors,
|
| + // the header will too. Thus we don't need to propagate individual errors.
|
| + if (response.has_update_signals_response())
|
| + LogIfErrorStatus(response.update_signals_response().status(), "Update");
|
| + if (response.has_manage_messages_response())
|
| + LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
|
| + if (response.has_manage_subscriptions_response()) {
|
| + LogIfErrorStatus(response.manage_subscriptions_response().status(),
|
| + "Subscribe");
|
| + }
|
| +
|
| + return result;
|
| +}
|
| +
|
| +// Request construction
|
| +
|
| +template <typename T>
|
| +BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
|
| + if (msg.has_token_exchange_strategy() &&
|
| + msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
|
| + return msg.token_exchange_strategy().broadcast_scan_configuration();
|
| + }
|
| + return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
|
| +}
|
| +
|
| +// This method will extract token exchange strategies
|
| +// from the publishes and subscribes in a report request.
|
| +// TODO(ckehoe): Delete this when the server supports
|
| +// BroadcastScanConfiguration.
|
| +BroadcastScanConfiguration ExtractTokenExchangeStrategy(
|
| + const ReportRequest& request) {
|
| + bool broadcast_only = false;
|
| + bool scan_only = false;
|
| +
|
| + // Strategies for publishes.
|
| + if (request.has_manage_messages_request()) {
|
| + const RepeatedPtrField<PublishedMessage> messages =
|
| + request.manage_messages_request().message_to_publish();
|
| + for (int i = 0; i < messages.size(); ++i) {
|
| + BroadcastScanConfiguration config =
|
| + GetBroadcastScanConfig(messages.Get(i));
|
| + broadcast_only = broadcast_only || config == BROADCAST_ONLY;
|
| + scan_only = scan_only || config == SCAN_ONLY;
|
| + if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
|
| + return BROADCAST_AND_SCAN;
|
| + }
|
| + }
|
| +
|
| + // Strategies for subscriptions.
|
| + if (request.has_manage_subscriptions_request()) {
|
| + const RepeatedPtrField<Subscription> messages =
|
| + request.manage_subscriptions_request().subscription();
|
| + for (int i = 0; i < messages.size(); ++i) {
|
| + BroadcastScanConfiguration config =
|
| + GetBroadcastScanConfig(messages.Get(i));
|
| + broadcast_only = broadcast_only || config == BROADCAST_ONLY;
|
| + scan_only = scan_only || config == SCAN_ONLY;
|
| + if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
|
| + return BROADCAST_AND_SCAN;
|
| + }
|
| + }
|
| +
|
| + if (broadcast_only)
|
| + return BROADCAST_ONLY;
|
| + if (scan_only)
|
| + return SCAN_ONLY;
|
| +
|
| + // If nothing else is specified, default to both broadcast and scan.
|
| + return BROADCAST_AND_SCAN;
|
| +}
|
| +
|
| +scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
|
| + scoped_ptr<DeviceState> state(new DeviceState);
|
| +
|
| + TokenTechnology* token_technology =
|
| + state->mutable_capabilities()->add_token_technology();
|
| + token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
|
| +
|
| + BroadcastScanConfiguration config =
|
| + ExtractTokenExchangeStrategy(request);
|
| + if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
|
| + token_technology->add_instruction_type(TRANSMIT);
|
| + if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
|
| + token_technology->add_instruction_type(RECEIVE);
|
| +
|
| + return state.Pass();
|
| +}
|
| +
|
| +// TODO(ckehoe): We're keeping this code in a separate function for now
|
| +// because we get a version string from Chrome, but the proto expects
|
| +// an int64 version. We should probably change the version proto
|
| +// to handle a more detailed version.
|
| +ClientVersion* CreateVersion(const std::string& client,
|
| + const std::string& version_name) {
|
| + ClientVersion* version = new ClientVersion;
|
| +
|
| + version->set_client(client);
|
| + version->set_version_name(version_name);
|
| +
|
| + return version;
|
| +}
|
| +
|
| +// Wrapper for the http post constructor. This is the default way
|
| +// to contact the server, but it can be overridden for testing.
|
| +void SendHttpPost(net::URLRequestContextGetter* url_context_getter,
|
| + const std::string& rpc_name,
|
| + scoped_ptr<MessageLite> request_proto,
|
| + const HttpPost::ResponseCallback& callback) {
|
| + // Create the base URL to call.
|
| + CommandLine* command_line = CommandLine::ForCurrentProcess();
|
| + const std::string copresence_server_host =
|
| + command_line->HasSwitch(switches::kCopresenceServer) ?
|
| + command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
|
| + kDefaultCopresenceServer;
|
| +
|
| + new HttpPost(url_context_getter,
|
| + copresence_server_host,
|
| + rpc_name,
|
| + *request_proto,
|
| + callback);
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +// Public methods
|
| +
|
| +RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
|
| + : delegate_(delegate),
|
| + invalid_audio_token_cache_(
|
| + base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
|
| + kMaxInvalidTokens),
|
| + server_post_callback_(base::Bind(&SendHttpPost)) {
|
| }
|
|
|
| -void RpcHandler::ReportTokens(copresence::TokenMedium medium,
|
| +RpcHandler::~RpcHandler() {}
|
| +
|
| +void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
|
| + scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
|
| + DCHECK(device_id_.empty());
|
| + device_id_ = delegate_->GetDeviceId();
|
| + if (!device_id_.empty()) {
|
| + init_done_callback.Run(true);
|
| + return;
|
| + }
|
| +
|
| + request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
|
| + Identity* identity =
|
| + request->mutable_device_identifiers()->mutable_registrant();
|
| + identity->set_type(CHROME);
|
| + identity->set_chrome_id(base::GenerateGUID());
|
| + SendServerRequest(
|
| + kRegisterDeviceRpcName,
|
| + std::string(),
|
| + request.Pass(),
|
| + base::Bind(&RpcHandler::RegisterResponseHandler,
|
| + AsWeakPtr(),
|
| + init_done_callback));
|
| +}
|
| +
|
| +void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
|
| + SendReportRequest(request.Pass(), std::string(), StatusCallback());
|
| +}
|
| +
|
| +void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
|
| + const std::string& app_id,
|
| + const StatusCallback& status_callback) {
|
| + DCHECK(request.get());
|
| + DCHECK(!device_id_.empty())
|
| + << "RpcHandler::Initialize() must complete successfully "
|
| + << "before other RpcHandler methods are called.";
|
| +
|
| + DVLOG(3) << "Sending report request to server.";
|
| +
|
| + request->mutable_update_signals_request()->set_allocated_state(
|
| + GetDeviceCapabilities(*request).release());
|
| + SendServerRequest(
|
| + kReportRequestRpcName,
|
| + app_id,
|
| + request.Pass(),
|
| + base::Bind(
|
| + &RpcHandler::ReportResponseHandler, AsWeakPtr(), status_callback));
|
| +}
|
| +
|
| +void RpcHandler::ReportTokens(TokenMedium medium,
|
| const std::vector<std::string>& tokens) {
|
| + DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND);
|
| + DCHECK(!tokens.empty());
|
| +
|
| + scoped_ptr<ReportRequest> request(new ReportRequest);
|
| + for (size_t i = 0; i < tokens.size(); ++i) {
|
| + const std::string& token = ToUrlSafe(tokens[i]);
|
| + if (invalid_audio_token_cache_.HasKey(token))
|
| + continue;
|
| +
|
| + DVLOG(3) << "Sending token " << token << " to server.";
|
| +
|
| + TokenObservation* token_observation =
|
| + request->mutable_update_signals_request()->add_token_observation();
|
| + token_observation->set_token_id(token);
|
| +
|
| + TokenSignals* signals = token_observation->add_signals();
|
| + signals->set_medium(medium);
|
| + signals->set_observed_time_millis(base::Time::Now().ToJsTime());
|
| + }
|
| + SendReportRequest(request.Pass());
|
| }
|
|
|
| -void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) {
|
| +void RpcHandler::ConnectToWhispernet() {
|
| + WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
|
| +
|
| + // |directive_handler_| will be destructed before us, so unretained is safe.
|
| + directive_handler_.reset(new DirectiveHandler);
|
| + directive_handler_->Initialize(
|
| + base::Bind(&WhispernetClient::DecodeSamples,
|
| + base::Unretained(whispernet_client)),
|
| + base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
|
| + base::Unretained(this)));
|
| +
|
| + whispernet_client->RegisterTokensCallback(
|
| + base::Bind(&RpcHandler::ReportTokens,
|
| + AsWeakPtr(),
|
| + AUDIO_ULTRASOUND_PASSBAND));
|
| }
|
|
|
| void RpcHandler::DisconnectFromWhispernet() {
|
| + directive_handler_.reset();
|
| +}
|
| +
|
| +// Private methods
|
| +
|
| +void RpcHandler::RegisterResponseHandler(
|
| + const SuccessCallback& init_done_callback,
|
| + int http_status_code,
|
| + const std::string& response_data) {
|
| + if (http_status_code != net::HTTP_OK) {
|
| + init_done_callback.Run(false);
|
| + return;
|
| + }
|
| +
|
| + RegisterDeviceResponse response;
|
| + if (!response.ParseFromString(response_data)) {
|
| + LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
|
| + init_done_callback.Run(false);
|
| + return;
|
| + }
|
| +
|
| + if (CopresenceErrorLogged(response.header().status()))
|
| + return;
|
| + device_id_ = response.registered_device_id();
|
| + DCHECK(!device_id_.empty());
|
| + DVLOG(2) << "Device registration successful: id " << device_id_;
|
| + delegate_->SaveDeviceId(device_id_);
|
| + init_done_callback.Run(true);
|
| +}
|
| +
|
| +void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
|
| + int http_status_code,
|
| + const std::string& response_data) {
|
| + if (http_status_code != net::HTTP_OK) {
|
| + if (!status_callback.is_null())
|
| + status_callback.Run(FAIL);
|
| + return;
|
| + }
|
| +
|
| + DVLOG(3) << "Received ReportResponse.";
|
| + ReportResponse response;
|
| + if (!response.ParseFromString(response_data)) {
|
| + LOG(ERROR) << "Invalid ReportResponse";
|
| + if (!status_callback.is_null())
|
| + status_callback.Run(FAIL);
|
| + return;
|
| + }
|
| +
|
| + if (ReportErrorLogged(response)) {
|
| + if (!status_callback.is_null())
|
| + status_callback.Run(FAIL);
|
| + return;
|
| + }
|
| +
|
| + const RepeatedPtrField<MessageResult>& message_results =
|
| + response.manage_messages_response().published_message_result();
|
| + for (int i = 0; i < message_results.size(); ++i) {
|
| + DVLOG(2) << "Published message with id "
|
| + << message_results.Get(i).published_message_id();
|
| + }
|
| +
|
| + const RepeatedPtrField<SubscriptionResult>& subscription_results =
|
| + response.manage_subscriptions_response().subscription_result();
|
| + for (int i = 0; i < subscription_results.size(); ++i) {
|
| + DVLOG(2) << "Created subscription with id "
|
| + << subscription_results.Get(i).subscription_id();
|
| + }
|
| +
|
| + if (response.has_update_signals_response()) {
|
| + const UpdateSignalsResponse& update_response =
|
| + response.update_signals_response();
|
| + DispatchMessages(update_response.message());
|
| +
|
| + if (directive_handler_.get()) {
|
| + for (int i = 0; i < update_response.directive_size(); ++i)
|
| + directive_handler_->AddDirective(update_response.directive(i));
|
| + } else {
|
| + DVLOG(1) << "No directive handler.";
|
| + }
|
| +
|
| + const RepeatedPtrField<Token>& tokens = update_response.token();
|
| + for (int i = 0; i < tokens.size(); ++i) {
|
| + switch (tokens.Get(i).status()) {
|
| + case VALID:
|
| + // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
|
| + // short TTL (like 10s) and send it up with every report request.
|
| + // Then we'll still get messages while we're waiting to hear it again.
|
| + VLOG(1) << "Got valid token " << tokens.Get(i).id();
|
| + break;
|
| + case INVALID:
|
| + DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
|
| + invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
|
| + break;
|
| + default:
|
| + DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
|
| + << tokens.Get(i).status();
|
| + }
|
| + }
|
| + }
|
| +
|
| + // TODO(ckehoe): Return a more detailed status response.
|
| + if (!status_callback.is_null())
|
| + status_callback.Run(SUCCESS);
|
| +}
|
| +
|
| +void RpcHandler::DispatchMessages(
|
| + const RepeatedPtrField<SubscribedMessage>& messages) {
|
| + if (messages.size() == 0)
|
| + return;
|
| +
|
| + // Index the messages by subscription id.
|
| + std::map<std::string, std::vector<Message> > messages_by_subscription;
|
| + DVLOG(3) << "Dispatching " << messages.size() << " messages";
|
| + for (int m = 0; m < messages.size(); ++m) {
|
| + const RepeatedPtrField<std::string>& subscription_ids =
|
| + messages.Get(m).subscription_id();
|
| + for (int s = 0; s < subscription_ids.size(); ++s) {
|
| + messages_by_subscription[subscription_ids.Get(s)].push_back(
|
| + messages.Get(m).published_message());
|
| + }
|
| + }
|
| +
|
| + // Send the messages for each subscription.
|
| + for (std::map<std::string, std::vector<Message> >::const_iterator
|
| + subscription = messages_by_subscription.begin();
|
| + subscription != messages_by_subscription.end();
|
| + ++subscription) {
|
| + // TODO(ckehoe): Once we have the app ID from the server, we need to pass
|
| + // it in here and get rid of the app id registry from the main API class.
|
| + delegate_->HandleMessages("", subscription->first, subscription->second);
|
| + }
|
| +}
|
| +
|
| +RequestHeader* RpcHandler::CreateRequestHeader(
|
| + const std::string& client_name) const {
|
| + RequestHeader* header = new RequestHeader;
|
| +
|
| + header->set_allocated_framework_version(
|
| + CreateVersion("Chrome", delegate_->GetPlatformVersionString()));
|
| + if (!client_name.empty()) {
|
| + header->set_allocated_client_version(
|
| + CreateVersion(client_name, std::string()));
|
| + }
|
| + header->set_current_time_millis(base::Time::Now().ToJsTime());
|
| + header->set_registered_device_id(device_id_);
|
| +
|
| + return header;
|
| +}
|
| +
|
| +template <class T>
|
| +void RpcHandler::SendServerRequest(
|
| + const std::string& rpc_name,
|
| + const std::string& app_id,
|
| + scoped_ptr<T> request,
|
| + const HttpPost::ResponseCallback& response_handler) {
|
| + request->set_allocated_header(CreateRequestHeader(app_id));
|
| + server_post_callback_.Run(delegate_->GetRequestContext(),
|
| + rpc_name,
|
| + make_scoped_ptr<MessageLite>(request.release()),
|
| + response_handler);
|
| +}
|
| +
|
| +void RpcHandler::AudioDirectiveListToWhispernetConnector(
|
| + const std::string& token,
|
| + const WhispernetClient::SamplesCallback& samples_callback) {
|
| + WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
|
| + if (whispernet_client) {
|
| + whispernet_client->RegisterSamplesCallback(samples_callback);
|
| + whispernet_client->EncodeToken(token);
|
| + }
|
| }
|
|
|
| } // namespace copresence
|
|
|