Chromium Code Reviews| Index: components/copresence/rpc/rpc_handler.cc |
| diff --git a/components/copresence/rpc/rpc_handler.cc b/components/copresence/rpc/rpc_handler.cc |
| index e5f12d0c4d969eaff957ac7b2aca360e232c0888..581d225abdda6ee2d55e48566b3215c9f17ab4d1 100644 |
| --- a/components/copresence/rpc/rpc_handler.cc |
| +++ b/components/copresence/rpc/rpc_handler.cc |
| @@ -1,42 +1,455 @@ |
| // Copyright 2014 The Chromium Authors. All rights reserved. |
| -// Use of this source code is governed by a BSD-style license that can be |
| -// found in the LICENSE file. |
| +// Use of this source code is governed by a BSD-style license |
| +// that can be found in the LICENSE file. |
| #include "components/copresence/rpc/rpc_handler.h" |
| +#include <map> |
| +#include <sstream> |
| + |
| #include "base/bind.h" |
| +#include "base/guid.h" |
| +#include "base/logging.h" |
| +#include "base/strings/string_util.h" |
| +#include "base/time/time.h" |
| +#include "components/copresence/handlers/directive_handler.h" |
| +#include "components/copresence/proto/codes.pb.h" |
| #include "components/copresence/proto/data.pb.h" |
| #include "components/copresence/proto/rpcs.pb.h" |
| #include "components/copresence/public/copresence_client_delegate.h" |
| -#include "components/copresence/public/whispernet_client.h" |
| +#include "net/http/http_status_code.h" |
| + |
| +// TODO(ckehoe): Return error messages for bad requests. |
| namespace copresence { |
| -RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, |
| - SuccessCallback init_done_callback) { |
| +using google::protobuf::MessageLite; |
| +using google::protobuf::RepeatedPtrField; |
| + |
| +namespace { |
| + |
| +// UrlSafe is defined as: |
| +// '/' represented by a '_' and '+' represented by a '-' |
| +// TODO(rkc): Move this to the wrapper. |
|
Daniel Erat
2014/08/06 16:01:17
if this is duplicated now (i know i saw it earlier
Charlie
2014/08/06 19:32:19
I have his CLs patched in, but a grep of my tree d
Daniel Erat
2014/08/06 21:35:18
yeah, that's probably what i'm thinking of.
Charlie
2014/08/06 22:36:23
Acknowledged.
|
| +std::string ToUrlSafe(std::string token) { |
| + base::ReplaceChars(token, "+", "-", &token); |
| + base::ReplaceChars(token, "/", "_", &token); |
| + return token; |
| } |
| -RpcHandler::~RpcHandler() { |
| +const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. |
| +const int kMaxInvalidTokens = 10000; |
| + |
| +const char kRegisterDeviceRpcName[] = "registerdevice"; |
| +const char kReportRequestRpcName[] = "report"; |
|
Daniel Erat
2014/08/06 16:01:16
make this be a public static const member so it is
Charlie
2014/08/06 19:32:19
Done.
|
| + |
| +// Logging |
| + |
| +// Logs the status and returns true if the status was OK. |
|
Daniel Erat
2014/08/06 16:01:16
this looks like it only logs the status if it was
Charlie
2014/08/06 19:32:19
Done.
|
| +bool LogStatus(const Status& status) { |
| + if (status.code() == OK) |
| + return true; |
| + |
| + std::stringstream log_message; |
|
Daniel Erat
2014/08/06 16:01:16
the style guide says not to use streams except for
Charlie
2014/08/06 19:32:19
Sure. Done.
On 2014/08/06 16:01:16, Daniel Erat w
|
| + log_message << "Copresence error code " << status.code(); |
| + if (!status.message().empty()) { |
| + log_message << ": " << status.message(); |
| + } |
| + |
| + LOG(ERROR) << log_message.str(); |
| + return false; |
| } |
| -void RpcHandler::SendReportRequest( |
| - scoped_ptr<copresence::ReportRequest> request) { |
| +void LogStatus(const util::error::Code& code, const std::string& context) { |
|
Daniel Erat
2014/08/06 16:01:16
don't overload function names
Charlie
2014/08/06 19:32:20
Done.
|
| + LOG_IF(ERROR, code != util::error::OK) |
| + << context << " error " << code << ". See " |
| + << "cs/google3/util/task/codes.proto for more info."; |
| } |
| -void RpcHandler::SendReportRequest( |
| - scoped_ptr<copresence::ReportRequest> request, |
| - const std::string& app_id, |
| - const StatusCallback& status_callback) { |
| +// Logs the status and returns true if the status was OK. |
| +bool LogStatus(const ReportResponse& response) { |
|
Daniel Erat
2014/08/06 16:01:17
don't overload function names
Charlie
2014/08/06 19:32:19
Done.
|
| + bool result = LogStatus(response.header().status()); |
| + if (response.has_manage_messages_response()) { |
|
Daniel Erat
2014/08/06 16:01:16
omit curly brackets for single-line statements
Charlie
2014/08/06 19:32:19
Done.
|
| + LogStatus(response.manage_messages_response().status(), "Publish"); |
| + } |
| + if (response.has_manage_subscriptions_response()) { |
| + LogStatus(response.manage_subscriptions_response().status(), "Subscribe"); |
| + } |
| + if (response.has_update_signals_response()) { |
| + LogStatus(response.update_signals_response().status(), "Update"); |
| + } |
| + return result; |
| +} |
| + |
| +// Request construction |
|
Daniel Erat
2014/08/06 16:01:16
nit: add trailing period
Charlie
2014/08/06 19:32:19
This is supposed to be a section header. Added a n
|
| +template <typename T> |
| +BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) { |
| + if (msg.has_token_exchange_strategy() && |
| + msg.token_exchange_strategy().has_broadcast_scan_configuration()) { |
| + return msg.token_exchange_strategy().broadcast_scan_configuration(); |
| + } |
| + return BROADCAST_SCAN_CONFIGURATION_UNKNOWN; |
| +} |
| + |
| +// This method will extract token exchange strategies |
| +// from the publishes and subscribes in a report request. |
| +BroadcastScanConfiguration ExtractTokenExchangeStrategy( |
| + ReportRequest* request) { |
|
Daniel Erat
2014/08/06 16:01:17
can this be a const reference?
Charlie
2014/08/06 19:32:19
Yes. Done.
On 2014/08/06 16:01:17, Daniel Erat wr
|
| + bool broadcast_only = false; |
| + bool scan_only = false; |
| + |
| + // Strategies for publishes. |
| + if (request->has_manage_messages_request()) { |
| + const RepeatedPtrField<PublishedMessage> messages = |
| + request->manage_messages_request().message_to_publish(); |
| + for (int i = 0; i < messages.size(); ++i) { |
| + BroadcastScanConfiguration config = |
| + GetBroadcastScanConfig(messages.Get(i)); |
| + broadcast_only = broadcast_only || config == BROADCAST_ONLY; |
| + scan_only = scan_only || config == SCAN_ONLY; |
| + if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) |
| + return BROADCAST_AND_SCAN; |
| + } |
| + } |
| + |
| + // Strategies for subscriptions. |
| + if (request->has_manage_subscriptions_request()) { |
| + const RepeatedPtrField<Subscription> messages = |
| + request->manage_subscriptions_request().subscription(); |
| + for (int i = 0; i < messages.size(); ++i) { |
| + BroadcastScanConfiguration config = |
| + GetBroadcastScanConfig(messages.Get(i)); |
| + broadcast_only = broadcast_only || config == BROADCAST_ONLY; |
| + scan_only = scan_only || config == SCAN_ONLY; |
| + if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) |
| + return BROADCAST_AND_SCAN; |
| + } |
| + } |
| + |
| + if (broadcast_only) |
| + return BROADCAST_ONLY; |
| + if (scan_only) |
| + return SCAN_ONLY; |
| + return BROADCAST_AND_SCAN; |
|
Daniel Erat
2014/08/06 16:01:16
do you need to check whether both broadcast_only a
Charlie
2014/08/06 19:32:20
No, this is our default. Added a comment.
On 2014
|
| +} |
| + |
| +DeviceState* GetDeviceCapabilities( |
|
Daniel Erat
2014/08/06 16:01:16
return a scoped_ptr instead so the ownership trans
Charlie
2014/08/06 19:32:19
Done.
|
| + ReportRequest* request) { |
|
Daniel Erat
2014/08/06 16:01:16
can this be a const reference?
also, unwrap this
Charlie
2014/08/06 19:32:19
Done.
|
| + DeviceState* state = new DeviceState; |
| + |
| + TokenTechnology* token_technology = |
| + state->mutable_capabilities()->add_token_technology(); |
| + token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND); |
| + |
| + BroadcastScanConfiguration config = |
| + ExtractTokenExchangeStrategy(request); |
| + if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN) |
| + token_technology->add_instruction_type(TRANSMIT); |
| + if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN) |
| + token_technology->add_instruction_type(RECEIVE); |
| + |
| + return state; |
| +} |
| + |
| +// TODO(ckehoe): We're keeping this code in a separate function for now |
| +// because we get a version string from Chrome, but the proto expects |
| +// an int64 version. We should probably change the version proto |
| +// to handle a more detailed version. |
| +ClientVersion* CreateVersion(const std::string& client, |
| + const std::string& version_name) { |
| + ClientVersion* version = new ClientVersion; |
| + |
| + version->set_client(client); |
| + version->set_version_name(version_name); |
| + |
| + return version; |
| +} |
| + |
| +// Wrapper for the http post constructor. This is the default way |
| +// to contact the server, but it can be overridden for testing. |
| +void SendHttpPost(net::URLRequestContextGetter* url_context_getter, |
| + const std::string& rpc_name, |
| + scoped_ptr<MessageLite> request_proto, |
| + const HttpPost::ResponseCallback& callback) { |
| + new HttpPost(url_context_getter, rpc_name, request_proto.Pass(), callback); |
| +} |
| + |
| +} // namespace |
| + |
| +// Public methods |
| + |
| +RpcHandler::RpcHandler(CopresenceClientDelegate* delegate) |
| + : delegate_(delegate), |
| + invalid_audio_token_cache_( |
| + base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), |
| + kMaxInvalidTokens), |
| + server_post_callback_(base::Bind(&SendHttpPost)) { |
| +} |
| + |
| +RpcHandler::~RpcHandler() {} |
| + |
| +void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { |
| + scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); |
| + DCHECK(device_id_.empty()); |
| + device_id_ = delegate_->GetDeviceId(); |
| + if (!device_id_.empty()) { |
| + init_done_callback.Run(true); |
| + return; |
| + } |
| + |
| + request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); |
| + Identity* identity = |
| + request->mutable_device_identifiers()->mutable_registrant(); |
| + identity->set_type(CHROME); |
| + identity->set_chrome_id(base::GenerateGUID()); |
| + SendServerRequest( |
| + kRegisterDeviceRpcName, |
| + std::string(), |
| + request.Pass(), |
| + base::Bind(&RpcHandler::RegisterResponseHandler, |
| + AsWeakPtr(), |
| + init_done_callback)); |
| +} |
| + |
| +void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) { |
| + SendReportRequest(request.Pass(), std::string(), StatusCallback()); |
| } |
| -void RpcHandler::ReportTokens(copresence::TokenMedium medium, |
| +void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, |
| + const std::string& app_id, |
| + const StatusCallback& status_callback) { |
| + DCHECK(request.get()); |
| + DCHECK(!device_id_.empty()) |
| + << "\nRpcHandler::Initialize() must complete successfully " |
|
Daniel Erat
2014/08/06 16:01:17
why do you have a newline at the beginning of this
Charlie
2014/08/06 19:32:19
I liked the log message better all on one line. Bu
|
| + << "before other RpcHandler methods are called."; |
| + |
| + DVLOG(3) << "Sending report request to server."; |
| + |
| + request->mutable_update_signals_request()->set_allocated_state( |
| + GetDeviceCapabilities(request.get())); |
| + SendServerRequest( |
| + kReportRequestRpcName, |
| + app_id, |
| + request.Pass(), |
| + base::Bind( |
| + &RpcHandler::ReportResponseHandler, AsWeakPtr(), status_callback)); |
| +} |
| + |
| +void RpcHandler::ReportTokens(TokenMedium medium, |
| const std::vector<std::string>& tokens) { |
| + DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND); |
| + DCHECK(!tokens.empty()); |
| + |
| + scoped_ptr<ReportRequest> request(new ReportRequest); |
| + for (size_t i = 0; i < tokens.size(); ++i) { |
| + const std::string& token = ToUrlSafe(tokens[i]); |
| + if (invalid_audio_token_cache_.HasKey(token)) |
| + continue; |
| + |
| + DVLOG(3) << "Sending token " << token << " to server."; |
| + |
| + TokenObservation* token_observation = |
| + request->mutable_update_signals_request()->add_token_observation(); |
| + token_observation->set_token_id(token); |
| + |
| + TokenSignals* signals = token_observation->add_signals(); |
| + signals->set_medium(medium); |
| + signals->set_observed_time_millis(base::Time::Now().ToJsTime()); |
| + } |
| + SendReportRequest(request.Pass()); |
| } |
| -void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { |
| +void RpcHandler::ConnectToWhispernet() { |
| + WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
| + |
| + // Directive_handler will be destructed before us, so unretained is safe. |
|
Daniel Erat
2014/08/06 16:01:17
s/Directive_handler/|directive_handler_|/
Charlie
2014/08/06 19:32:20
Done.
|
| + directive_handler_.reset(new DirectiveHandler( |
| + base::Bind(&WhispernetClient::DecodeSamples, |
| + base::Unretained(whispernet_client)), |
| + base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, |
| + base::Unretained(this)))); |
| + |
| + whispernet_client->RegisterTokensCallback( |
| + base::Bind(&RpcHandler::ReportTokens, |
| + base::Unretained(this), |
|
xiyuan
2014/08/05 21:33:39
AsWeakPtr()?
Charlie
2014/08/06 19:32:20
Done.
|
| + AUDIO_ULTRASOUND_PASSBAND)); |
| } |
| void RpcHandler::DisconnectFromWhispernet() { |
| + directive_handler_.reset(); |
| +} |
| + |
| +// Private methods |
| + |
| +void RpcHandler::RegisterResponseHandler( |
| + const SuccessCallback& init_done_callback, |
| + int http_status_code, |
| + const std::string& response_data) { |
| + if (http_status_code != net::HTTP_OK) { |
| + init_done_callback.Run(false); |
| + return; |
| + } |
| + |
| + RegisterDeviceResponse response; |
| + if (!response.ParseFromString(response_data)) { |
| + LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; |
| + init_done_callback.Run(false); |
| + return; |
| + } |
| + |
| + if (!LogStatus(response.header().status())) |
| + return; |
| + device_id_ = response.registered_device_id(); |
| + DCHECK(!device_id_.empty()); |
| + DVLOG(2) << "Device registration successful: id " << device_id_; |
| + delegate_->SaveDeviceId(device_id_); |
| + init_done_callback.Run(true); |
| +} |
| + |
| +void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, |
| + int http_status_code, |
| + const std::string& response_data) { |
| + if (http_status_code != net::HTTP_OK) { |
| + if (!status_callback.is_null()) |
| + status_callback.Run(FAIL); |
| + return; |
| + } |
| + |
| + DVLOG(3) << "Received ReportResponse."; |
| + ReportResponse response; |
| + if (!response.ParseFromString(response_data)) { |
| + LOG(ERROR) << "Invalid ReportResponse"; |
| + if (!status_callback.is_null()) |
| + status_callback.Run(FAIL); |
| + return; |
| + } |
| + |
| + if (!LogStatus(response)) { |
| + if (!status_callback.is_null()) |
| + status_callback.Run(FAIL); |
| + return; |
| + } |
| + |
| + if (response.has_manage_messages_response()) { |
| + RepeatedPtrField<MessageResult> message_results = |
| + response.manage_messages_response().published_message_result(); |
| + for (int i = 0; i < message_results.size(); ++i) { |
| + DVLOG(2) << "Published message with id " |
| + << message_results.Get(i).published_message_id(); |
| + } |
| + } |
| + |
| + if (response.has_manage_subscriptions_response()) { |
| + RepeatedPtrField<SubscriptionResult> subscription_results = |
| + response.manage_subscriptions_response().subscription_result(); |
| + for (int i = 0; i < subscription_results.size(); ++i) { |
| + DVLOG(2) << "Created subscription with id " |
| + << subscription_results.Get(i).subscription_id(); |
| + } |
| + } |
| + |
| + if (response.has_update_signals_response()) { |
| + const UpdateSignalsResponse& update_response = |
| + response.update_signals_response(); |
| + DispatchMessages(update_response.message()); |
| + |
| + if (directive_handler_.get()) { |
| + for (int i = 0; i < update_response.directive_size(); ++i) |
| + directive_handler_->AddDirective(update_response.directive(i)); |
| + } else { |
| + DVLOG(1) << "No directive handler."; |
| + } |
| + |
| + RepeatedPtrField<Token> tokens = update_response.token(); |
| + for (int i = 0; i < tokens.size(); ++i) { |
| + switch (tokens.Get(i).status()) { |
| + case VALID: |
| + // TODO(rkc/ckehoe): Store the token in a valid_token_cache_ with a |
|
Daniel Erat
2014/08/06 16:01:16
nit: |valid_token_cache_|
Charlie
2014/08/06 19:32:19
Done.
|
| + // short ttl (like 10s) and send it up with every report request. |
|
Daniel Erat
2014/08/06 16:01:16
nit: s/ttl/TTL/
Charlie
2014/08/06 19:32:19
Done.
|
| + // Then we'll still get messages while we're waiting to hear it again. |
| + VLOG(1) << "Got valid token " << tokens.Get(i).id(); |
| + break; |
| + case INVALID: |
| + DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); |
| + invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); |
| + break; |
| + default: |
| + DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " |
| + << tokens.Get(i).status(); |
| + } |
| + } |
| + } |
| + |
| + // TODO(ckehoe): Return a more detailed status response. |
| + if (!status_callback.is_null()) |
| + status_callback.Run(SUCCESS); |
| +} |
| + |
| +void RpcHandler::DispatchMessages( |
| + const RepeatedPtrField<SubscribedMessage>& messages) { |
| + if (messages.size() == 0) |
| + return; |
| + |
| + // Index the messages by subscription id. |
| + std::map<std::string, std::vector<Message> > messages_by_subscription; |
| + DVLOG(3) << "Dispatching " << messages.size() << " messages"; |
| + for (int m = 0; m < messages.size(); ++m) { |
| + const RepeatedPtrField<std::string>& subscription_ids = |
| + messages.Get(m).subscription_id(); |
| + for (int s = 0; s < subscription_ids.size(); ++s) { |
| + messages_by_subscription[subscription_ids.Get(s)].push_back( |
| + messages.Get(m).published_message()); |
| + } |
| + } |
| + |
| + // Send the messages for each subscription. |
| + for (std::map<std::string, std::vector<Message> >::const_iterator |
| + subscription = messages_by_subscription.begin(); |
| + subscription != messages_by_subscription.end(); |
| + ++subscription) { |
| + // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
| + // it in here and get rid of the app id registry from the main API class. |
| + delegate_->HandleMessages("", subscription->first, subscription->second); |
| + } |
| +} |
| + |
| +RequestHeader* RpcHandler::CreateRequestHeader( |
| + const std::string& client_name) const { |
| + RequestHeader* header = new RequestHeader; |
| + |
| + header->set_allocated_framework_version( |
| + CreateVersion("Chrome", delegate_->GetPlatformVersionString())); |
| + if (!client_name.empty()) { |
| + header->set_allocated_client_version( |
| + CreateVersion(client_name, std::string())); |
| + } |
| + header->set_current_time_millis(base::Time::Now().ToJsTime()); |
| + header->set_registered_device_id(device_id_); |
| + |
| + return header; |
| +} |
| + |
| +template <class T> |
| +void RpcHandler::SendServerRequest( |
| + const std::string& rpc_name, |
| + const std::string& app_id, |
| + scoped_ptr<T> request, |
| + const HttpPost::ResponseCallback& response_handler) { |
| + request->set_allocated_header(CreateRequestHeader(app_id)); |
| + server_post_callback_.Run(delegate_->GetRequestContext(), |
| + rpc_name, |
| + make_scoped_ptr<MessageLite>(request.release()), |
| + response_handler); |
| +} |
| + |
| +void RpcHandler::AudioDirectiveListToWhispernetConnector( |
| + const std::string& token, |
| + const WhispernetClient::SamplesCallback& samples_callback) { |
| + WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
| + if (whispernet_client) { |
| + whispernet_client->RegisterSamplesCallback(samples_callback); |
| + whispernet_client->EncodeToken(token); |
| + } |
| } |
| } // namespace copresence |