Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10669)

Unified Diff: components/copresence/rpc/rpc_handler.cc

Issue 433283002: Adding the Copresence RpcHandler and HttpPost helper. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@directive-handler
Patch Set: First round of review fixes Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/copresence/rpc/rpc_handler.cc
diff --git a/components/copresence/rpc/rpc_handler.cc b/components/copresence/rpc/rpc_handler.cc
index e5f12d0c4d969eaff957ac7b2aca360e232c0888..0b737c1ff107920428ecf902607699e154657aec 100644
--- a/components/copresence/rpc/rpc_handler.cc
+++ b/components/copresence/rpc/rpc_handler.cc
@@ -1,42 +1,469 @@
// Copyright 2014 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
+// Use of this source code is governed by a BSD-style license
+// that can be found in the LICENSE file.
#include "components/copresence/rpc/rpc_handler.h"
+#include <map>
+
#include "base/bind.h"
+#include "base/command_line.h"
+#include "base/guid.h"
+#include "base/logging.h"
+#include "base/strings/string_util.h"
+#include "base/time/time.h"
+#include "components/copresence/copresence_switches.h"
+#include "components/copresence/handlers/directive_handler.h"
+#include "components/copresence/proto/codes.pb.h"
#include "components/copresence/proto/data.pb.h"
#include "components/copresence/proto/rpcs.pb.h"
#include "components/copresence/public/copresence_client_delegate.h"
-#include "components/copresence/public/whispernet_client.h"
+#include "net/http/http_status_code.h"
+
+// TODO(ckehoe): Return error messages for bad requests.
namespace copresence {
-RpcHandler::RpcHandler(CopresenceClientDelegate* delegate,
- SuccessCallback init_done_callback) {
+using google::protobuf::MessageLite;
+using google::protobuf::RepeatedPtrField;
+
+const char RpcHandler::kReportRequestRpcName[] = "report";
+
+namespace {
+
+// UrlSafe is defined as:
+// '/' represented by a '_' and '+' represented by a '-'
+// TODO(rkc): Move this to the wrapper.
+std::string ToUrlSafe(std::string token) {
+ base::ReplaceChars(token, "+", "-", &token);
+ base::ReplaceChars(token, "/", "_", &token);
+ return token;
}
-RpcHandler::~RpcHandler() {
+const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
+const int kMaxInvalidTokens = 10000;
+const char kRegisterDeviceRpcName[] = "registerdevice";
+const char kDefaultCopresenceServer[] =
+ "https://www.googleapis.com/copresence/v2/copresence";
+
+// Logging
+
+// Checks for a copresence error. If there is one, logs it and returns true.
+bool CopresenceErrorLogged(const Status& status) {
+ if (status.code() != OK) {
+ LOG(ERROR) << "Copresence error code " << status.code()
+ << (status.message().empty() ? std::string() :
+ ": " + status.message());
+ }
+ return status.code() != OK;
}
-void RpcHandler::SendReportRequest(
- scoped_ptr<copresence::ReportRequest> request) {
+void LogIfErrorStatus(const util::error::Code& code,
+ const std::string& context) {
+ LOG_IF(ERROR, code != util::error::OK)
+ << context << " error " << code << ". See "
+ << "cs/google3/util/task/codes.proto for more info.";
}
-void RpcHandler::SendReportRequest(
- scoped_ptr<copresence::ReportRequest> request,
- const std::string& app_id,
- const StatusCallback& status_callback) {
+// If any errors occurred, logs them and returns true.
+bool ReportErrorLogged(const ReportResponse& response) {
+ bool result = CopresenceErrorLogged(response.header().status());
+
+ // The Report fails or succeeds as a unit. If any responses had errors,
+ // the header will too. Thus we don't need to propagate individual errors.
+ if (response.has_update_signals_response())
+ LogIfErrorStatus(response.update_signals_response().status(), "Update");
+ if (response.has_manage_messages_response())
+ LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
+ if (response.has_manage_subscriptions_response()) {
+ LogIfErrorStatus(response.manage_subscriptions_response().status(),
+ "Subscribe");
+ }
+
+ return result;
+}
+
+// Request construction
+
+template <typename T>
+BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
+ if (msg.has_token_exchange_strategy() &&
+ msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
+ return msg.token_exchange_strategy().broadcast_scan_configuration();
+ }
+ return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
+}
+
+// This method will extract token exchange strategies
+// from the publishes and subscribes in a report request.
+// TODO(ckehoe): Delete this when the server supports
+// BroadcastScanConfiguration.
+BroadcastScanConfiguration ExtractTokenExchangeStrategy(
+ const ReportRequest& request) {
+ bool broadcast_only = false;
+ bool scan_only = false;
+
+ // Strategies for publishes.
+ if (request.has_manage_messages_request()) {
+ const RepeatedPtrField<PublishedMessage> messages =
+ request.manage_messages_request().message_to_publish();
+ for (int i = 0; i < messages.size(); ++i) {
+ BroadcastScanConfiguration config =
+ GetBroadcastScanConfig(messages.Get(i));
+ broadcast_only = broadcast_only || config == BROADCAST_ONLY;
+ scan_only = scan_only || config == SCAN_ONLY;
+ if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
+ return BROADCAST_AND_SCAN;
+ }
+ }
+
+ // Strategies for subscriptions.
+ if (request.has_manage_subscriptions_request()) {
+ const RepeatedPtrField<Subscription> messages =
+ request.manage_subscriptions_request().subscription();
+ for (int i = 0; i < messages.size(); ++i) {
+ BroadcastScanConfiguration config =
+ GetBroadcastScanConfig(messages.Get(i));
+ broadcast_only = broadcast_only || config == BROADCAST_ONLY;
+ scan_only = scan_only || config == SCAN_ONLY;
+ if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
+ return BROADCAST_AND_SCAN;
+ }
+ }
+
+ if (broadcast_only)
+ return BROADCAST_ONLY;
+ if (scan_only)
+ return SCAN_ONLY;
+
+ // If nothing else is specified, default to both broadcast and scan.
+ return BROADCAST_AND_SCAN;
+}
+
+scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
+ scoped_ptr<DeviceState> state(new DeviceState);
+
+ TokenTechnology* token_technology =
+ state->mutable_capabilities()->add_token_technology();
+ token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
+
+ BroadcastScanConfiguration config =
+ ExtractTokenExchangeStrategy(request);
+ if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
+ token_technology->add_instruction_type(TRANSMIT);
+ if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
+ token_technology->add_instruction_type(RECEIVE);
+
+ return state.Pass();
+}
+
+// TODO(ckehoe): We're keeping this code in a separate function for now
+// because we get a version string from Chrome, but the proto expects
+// an int64 version. We should probably change the version proto
+// to handle a more detailed version.
+ClientVersion* CreateVersion(const std::string& client,
+ const std::string& version_name) {
+ ClientVersion* version = new ClientVersion;
+
+ version->set_client(client);
+ version->set_version_name(version_name);
+
+ return version;
+}
+
+// Wrapper for the http post constructor. This is the default way
+// to contact the server, but it can be overridden for testing.
+void SendHttpPost(net::URLRequestContextGetter* url_context_getter,
+ const std::string& rpc_name,
+ scoped_ptr<MessageLite> request_proto,
+ const HttpPost::ResponseCallback& callback) {
+ // Create the base URL to call.
+ CommandLine* command_line = CommandLine::ForCurrentProcess();
+ const std::string copresence_server_host =
+ command_line->HasSwitch(switches::kCopresenceServer) ?
+ command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
+ kDefaultCopresenceServer;
+
+ new HttpPost(url_context_getter,
+ copresence_server_host,
+ rpc_name,
+ *request_proto,
+ callback);
+}
+
+} // namespace
+
+// Public methods
+
+RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
+ : delegate_(delegate),
+ invalid_audio_token_cache_(
+ base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
+ kMaxInvalidTokens),
+ server_post_callback_(base::Bind(&SendHttpPost)) {
}
-void RpcHandler::ReportTokens(copresence::TokenMedium medium,
+RpcHandler::~RpcHandler() {}
+
+void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
+ scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
+ DCHECK(device_id_.empty());
+ device_id_ = delegate_->GetDeviceId();
+ if (!device_id_.empty()) {
+ init_done_callback.Run(true);
+ return;
+ }
+
+ request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
+ Identity* identity =
+ request->mutable_device_identifiers()->mutable_registrant();
+ identity->set_type(CHROME);
+ identity->set_chrome_id(base::GenerateGUID());
+ SendServerRequest(
+ kRegisterDeviceRpcName,
+ std::string(),
+ request.Pass(),
+ base::Bind(&RpcHandler::RegisterResponseHandler,
+ AsWeakPtr(),
+ init_done_callback));
+}
+
+void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
+ SendReportRequest(request.Pass(), std::string(), StatusCallback());
+}
+
+void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
+ const std::string& app_id,
+ const StatusCallback& status_callback) {
+ DCHECK(request.get());
+ DCHECK(!device_id_.empty())
+ << "RpcHandler::Initialize() must complete successfully "
+ << "before other RpcHandler methods are called.";
+
+ DVLOG(3) << "Sending report request to server.";
+
+ request->mutable_update_signals_request()->set_allocated_state(
+ GetDeviceCapabilities(*request).release());
+ SendServerRequest(
+ kReportRequestRpcName,
+ app_id,
+ request.Pass(),
+ base::Bind(
+ &RpcHandler::ReportResponseHandler, AsWeakPtr(), status_callback));
+}
+
+void RpcHandler::ReportTokens(TokenMedium medium,
const std::vector<std::string>& tokens) {
+ DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND);
+ DCHECK(!tokens.empty());
+
+ scoped_ptr<ReportRequest> request(new ReportRequest);
+ for (size_t i = 0; i < tokens.size(); ++i) {
+ const std::string& token = ToUrlSafe(tokens[i]);
+ if (invalid_audio_token_cache_.HasKey(token))
+ continue;
+
+ DVLOG(3) << "Sending token " << token << " to server.";
+
+ TokenObservation* token_observation =
+ request->mutable_update_signals_request()->add_token_observation();
+ token_observation->set_token_id(token);
+
+ TokenSignals* signals = token_observation->add_signals();
+ signals->set_medium(medium);
+ signals->set_observed_time_millis(base::Time::Now().ToJsTime());
+ }
+ SendReportRequest(request.Pass());
}
-void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) {
+void RpcHandler::ConnectToWhispernet() {
+ WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
+
+ // |directive_handler_| will be destructed before us, so unretained is safe.
+ directive_handler_.reset(new DirectiveHandler);
+ directive_handler_->Initialize(
+ base::Bind(&WhispernetClient::DecodeSamples,
+ base::Unretained(whispernet_client)),
+ base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
+ base::Unretained(this)));
+
+ whispernet_client->RegisterTokensCallback(
+ base::Bind(&RpcHandler::ReportTokens,
+ AsWeakPtr(),
+ AUDIO_ULTRASOUND_PASSBAND));
}
void RpcHandler::DisconnectFromWhispernet() {
+ directive_handler_.reset();
+}
+
+// Private methods
+
+void RpcHandler::RegisterResponseHandler(
+ const SuccessCallback& init_done_callback,
+ int http_status_code,
+ const std::string& response_data) {
+ if (http_status_code != net::HTTP_OK) {
+ init_done_callback.Run(false);
+ return;
+ }
+
+ RegisterDeviceResponse response;
+ if (!response.ParseFromString(response_data)) {
+ LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
+ init_done_callback.Run(false);
+ return;
+ }
+
+ if (CopresenceErrorLogged(response.header().status()))
+ return;
+ device_id_ = response.registered_device_id();
+ DCHECK(!device_id_.empty());
+ DVLOG(2) << "Device registration successful: id " << device_id_;
+ delegate_->SaveDeviceId(device_id_);
+ init_done_callback.Run(true);
+}
+
+void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
+ int http_status_code,
+ const std::string& response_data) {
+ if (http_status_code != net::HTTP_OK) {
+ if (!status_callback.is_null())
+ status_callback.Run(FAIL);
+ return;
+ }
+
+ DVLOG(3) << "Received ReportResponse.";
+ ReportResponse response;
+ if (!response.ParseFromString(response_data)) {
+ LOG(ERROR) << "Invalid ReportResponse";
+ if (!status_callback.is_null())
+ status_callback.Run(FAIL);
+ return;
+ }
+
+ if (ReportErrorLogged(response)) {
+ if (!status_callback.is_null())
+ status_callback.Run(FAIL);
+ return;
+ }
+
+ const RepeatedPtrField<MessageResult>& message_results =
+ response.manage_messages_response().published_message_result();
+ for (int i = 0; i < message_results.size(); ++i) {
+ DVLOG(2) << "Published message with id "
+ << message_results.Get(i).published_message_id();
+ }
+
+ const RepeatedPtrField<SubscriptionResult>& subscription_results =
+ response.manage_subscriptions_response().subscription_result();
+ for (int i = 0; i < subscription_results.size(); ++i) {
+ DVLOG(2) << "Created subscription with id "
+ << subscription_results.Get(i).subscription_id();
+ }
+
+ if (response.has_update_signals_response()) {
+ const UpdateSignalsResponse& update_response =
+ response.update_signals_response();
+ DispatchMessages(update_response.message());
+
+ if (directive_handler_.get()) {
+ for (int i = 0; i < update_response.directive_size(); ++i)
+ directive_handler_->AddDirective(update_response.directive(i));
+ } else {
+ DVLOG(1) << "No directive handler.";
+ }
+
+ const RepeatedPtrField<Token>& tokens = update_response.token();
+ for (int i = 0; i < tokens.size(); ++i) {
+ switch (tokens.Get(i).status()) {
+ case VALID:
+ // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
+ // short TTL (like 10s) and send it up with every report request.
+ // Then we'll still get messages while we're waiting to hear it again.
+ VLOG(1) << "Got valid token " << tokens.Get(i).id();
+ break;
+ case INVALID:
+ DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
+ invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
+ break;
+ default:
+ DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
+ << tokens.Get(i).status();
+ }
+ }
+ }
+
+ // TODO(ckehoe): Return a more detailed status response.
+ if (!status_callback.is_null())
+ status_callback.Run(SUCCESS);
+}
+
+void RpcHandler::DispatchMessages(
+ const RepeatedPtrField<SubscribedMessage>& messages) {
+ if (messages.size() == 0)
+ return;
+
+ // Index the messages by subscription id.
+ std::map<std::string, std::vector<Message> > messages_by_subscription;
+ DVLOG(3) << "Dispatching " << messages.size() << " messages";
+ for (int m = 0; m < messages.size(); ++m) {
+ const RepeatedPtrField<std::string>& subscription_ids =
+ messages.Get(m).subscription_id();
+ for (int s = 0; s < subscription_ids.size(); ++s) {
+ messages_by_subscription[subscription_ids.Get(s)].push_back(
+ messages.Get(m).published_message());
+ }
+ }
+
+ // Send the messages for each subscription.
+ for (std::map<std::string, std::vector<Message> >::const_iterator
+ subscription = messages_by_subscription.begin();
+ subscription != messages_by_subscription.end();
+ ++subscription) {
+ // TODO(ckehoe): Once we have the app ID from the server, we need to pass
+ // it in here and get rid of the app id registry from the main API class.
+ delegate_->HandleMessages("", subscription->first, subscription->second);
+ }
+}
+
+RequestHeader* RpcHandler::CreateRequestHeader(
+ const std::string& client_name) const {
+ RequestHeader* header = new RequestHeader;
+
+ header->set_allocated_framework_version(
+ CreateVersion("Chrome", delegate_->GetPlatformVersionString()));
+ if (!client_name.empty()) {
+ header->set_allocated_client_version(
+ CreateVersion(client_name, std::string()));
+ }
+ header->set_current_time_millis(base::Time::Now().ToJsTime());
+ header->set_registered_device_id(device_id_);
+
+ return header;
+}
+
+template <class T>
+void RpcHandler::SendServerRequest(
+ const std::string& rpc_name,
+ const std::string& app_id,
+ scoped_ptr<T> request,
+ const HttpPost::ResponseCallback& response_handler) {
+ request->set_allocated_header(CreateRequestHeader(app_id));
+ server_post_callback_.Run(delegate_->GetRequestContext(),
+ rpc_name,
+ make_scoped_ptr<MessageLite>(request.release()),
+ response_handler);
+}
+
+void RpcHandler::AudioDirectiveListToWhispernetConnector(
+ const std::string& token,
+ const WhispernetClient::SamplesCallback& samples_callback) {
+ WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
+ if (whispernet_client) {
+ whispernet_client->RegisterSamplesCallback(samples_callback);
+ whispernet_client->EncodeToken(token);
+ }
}
} // namespace copresence

Powered by Google App Engine
This is Rietveld 408576698