Index: components/copresence/rpc/rpc_handler.cc |
diff --git a/components/copresence/rpc/rpc_handler.cc b/components/copresence/rpc/rpc_handler.cc |
index e5f12d0c4d969eaff957ac7b2aca360e232c0888..581d225abdda6ee2d55e48566b3215c9f17ab4d1 100644 |
--- a/components/copresence/rpc/rpc_handler.cc |
+++ b/components/copresence/rpc/rpc_handler.cc |
@@ -1,42 +1,455 @@ |
// Copyright 2014 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
+// Use of this source code is governed by a BSD-style license |
+// that can be found in the LICENSE file. |
#include "components/copresence/rpc/rpc_handler.h" |
+#include <map> |
+#include <sstream> |
+ |
#include "base/bind.h" |
+#include "base/guid.h" |
+#include "base/logging.h" |
+#include "base/strings/string_util.h" |
+#include "base/time/time.h" |
+#include "components/copresence/handlers/directive_handler.h" |
+#include "components/copresence/proto/codes.pb.h" |
#include "components/copresence/proto/data.pb.h" |
#include "components/copresence/proto/rpcs.pb.h" |
#include "components/copresence/public/copresence_client_delegate.h" |
-#include "components/copresence/public/whispernet_client.h" |
+#include "net/http/http_status_code.h" |
+ |
+// TODO(ckehoe): Return error messages for bad requests. |
namespace copresence { |
-RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, |
- SuccessCallback init_done_callback) { |
+using google::protobuf::MessageLite; |
+using google::protobuf::RepeatedPtrField; |
+ |
+namespace { |
+ |
+// UrlSafe is defined as: |
+// '/' represented by a '_' and '+' represented by a '-' |
+// TODO(rkc): Move this to the wrapper. |
Daniel Erat
2014/08/06 16:01:17
if this is duplicated now (i know i saw it earlier
Charlie
2014/08/06 19:32:19
I have his CLs patched in, but a grep of my tree d
Daniel Erat
2014/08/06 21:35:18
yeah, that's probably what i'm thinking of.
Charlie
2014/08/06 22:36:23
Acknowledged.
|
+std::string ToUrlSafe(std::string token) { |
+ base::ReplaceChars(token, "+", "-", &token); |
+ base::ReplaceChars(token, "/", "_", &token); |
+ return token; |
} |
-RpcHandler::~RpcHandler() { |
+const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. |
+const int kMaxInvalidTokens = 10000; |
+ |
+const char kRegisterDeviceRpcName[] = "registerdevice"; |
+const char kReportRequestRpcName[] = "report"; |
Daniel Erat
2014/08/06 16:01:16
make this be a public static const member so it is
Charlie
2014/08/06 19:32:19
Done.
|
+ |
+// Logging |
+ |
+// Logs the status and returns true if the status was OK. |
Daniel Erat
2014/08/06 16:01:16
this looks like it only logs the status if it was
Charlie
2014/08/06 19:32:19
Done.
|
+bool LogStatus(const Status& status) { |
+ if (status.code() == OK) |
+ return true; |
+ |
+ std::stringstream log_message; |
Daniel Erat
2014/08/06 16:01:16
the style guide says not to use streams except for
Charlie
2014/08/06 19:32:19
Sure. Done.
On 2014/08/06 16:01:16, Daniel Erat w
|
+ log_message << "Copresence error code " << status.code(); |
+ if (!status.message().empty()) { |
+ log_message << ": " << status.message(); |
+ } |
+ |
+ LOG(ERROR) << log_message.str(); |
+ return false; |
} |
-void RpcHandler::SendReportRequest( |
- scoped_ptr<copresence::ReportRequest> request) { |
+void LogStatus(const util::error::Code& code, const std::string& context) { |
Daniel Erat
2014/08/06 16:01:16
don't overload function names
Charlie
2014/08/06 19:32:20
Done.
|
+ LOG_IF(ERROR, code != util::error::OK) |
+ << context << " error " << code << ". See " |
+ << "cs/google3/util/task/codes.proto for more info."; |
} |
-void RpcHandler::SendReportRequest( |
- scoped_ptr<copresence::ReportRequest> request, |
- const std::string& app_id, |
- const StatusCallback& status_callback) { |
+// Logs the status and returns true if the status was OK. |
+bool LogStatus(const ReportResponse& response) { |
Daniel Erat
2014/08/06 16:01:17
don't overload function names
Charlie
2014/08/06 19:32:19
Done.
|
+ bool result = LogStatus(response.header().status()); |
+ if (response.has_manage_messages_response()) { |
Daniel Erat
2014/08/06 16:01:16
omit curly brackets for single-line statements
Charlie
2014/08/06 19:32:19
Done.
|
+ LogStatus(response.manage_messages_response().status(), "Publish"); |
+ } |
+ if (response.has_manage_subscriptions_response()) { |
+ LogStatus(response.manage_subscriptions_response().status(), "Subscribe"); |
+ } |
+ if (response.has_update_signals_response()) { |
+ LogStatus(response.update_signals_response().status(), "Update"); |
+ } |
+ return result; |
+} |
+ |
+// Request construction |
Daniel Erat
2014/08/06 16:01:16
nit: add trailing period
Charlie
2014/08/06 19:32:19
This is supposed to be a section header. Added a n
|
+template <typename T> |
+BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) { |
+ if (msg.has_token_exchange_strategy() && |
+ msg.token_exchange_strategy().has_broadcast_scan_configuration()) { |
+ return msg.token_exchange_strategy().broadcast_scan_configuration(); |
+ } |
+ return BROADCAST_SCAN_CONFIGURATION_UNKNOWN; |
+} |
+ |
+// This method will extract token exchange strategies |
+// from the publishes and subscribes in a report request. |
+BroadcastScanConfiguration ExtractTokenExchangeStrategy( |
+ ReportRequest* request) { |
Daniel Erat
2014/08/06 16:01:17
can this be a const reference?
Charlie
2014/08/06 19:32:19
Yes. Done.
On 2014/08/06 16:01:17, Daniel Erat wr
|
+ bool broadcast_only = false; |
+ bool scan_only = false; |
+ |
+ // Strategies for publishes. |
+ if (request->has_manage_messages_request()) { |
+ const RepeatedPtrField<PublishedMessage> messages = |
+ request->manage_messages_request().message_to_publish(); |
+ for (int i = 0; i < messages.size(); ++i) { |
+ BroadcastScanConfiguration config = |
+ GetBroadcastScanConfig(messages.Get(i)); |
+ broadcast_only = broadcast_only || config == BROADCAST_ONLY; |
+ scan_only = scan_only || config == SCAN_ONLY; |
+ if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) |
+ return BROADCAST_AND_SCAN; |
+ } |
+ } |
+ |
+ // Strategies for subscriptions. |
+ if (request->has_manage_subscriptions_request()) { |
+ const RepeatedPtrField<Subscription> messages = |
+ request->manage_subscriptions_request().subscription(); |
+ for (int i = 0; i < messages.size(); ++i) { |
+ BroadcastScanConfiguration config = |
+ GetBroadcastScanConfig(messages.Get(i)); |
+ broadcast_only = broadcast_only || config == BROADCAST_ONLY; |
+ scan_only = scan_only || config == SCAN_ONLY; |
+ if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only)) |
+ return BROADCAST_AND_SCAN; |
+ } |
+ } |
+ |
+ if (broadcast_only) |
+ return BROADCAST_ONLY; |
+ if (scan_only) |
+ return SCAN_ONLY; |
+ return BROADCAST_AND_SCAN; |
Daniel Erat
2014/08/06 16:01:16
do you need to check whether both broadcast_only a
Charlie
2014/08/06 19:32:20
No, this is our default. Added a comment.
On 2014
|
+} |
+ |
+DeviceState* GetDeviceCapabilities( |
Daniel Erat
2014/08/06 16:01:16
return a scoped_ptr instead so the ownership trans
Charlie
2014/08/06 19:32:19
Done.
|
+ ReportRequest* request) { |
Daniel Erat
2014/08/06 16:01:16
can this be a const reference?
also, unwrap this
Charlie
2014/08/06 19:32:19
Done.
|
+ DeviceState* state = new DeviceState; |
+ |
+ TokenTechnology* token_technology = |
+ state->mutable_capabilities()->add_token_technology(); |
+ token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND); |
+ |
+ BroadcastScanConfiguration config = |
+ ExtractTokenExchangeStrategy(request); |
+ if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN) |
+ token_technology->add_instruction_type(TRANSMIT); |
+ if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN) |
+ token_technology->add_instruction_type(RECEIVE); |
+ |
+ return state; |
+} |
+ |
+// TODO(ckehoe): We're keeping this code in a separate function for now |
+// because we get a version string from Chrome, but the proto expects |
+// an int64 version. We should probably change the version proto |
+// to handle a more detailed version. |
+ClientVersion* CreateVersion(const std::string& client, |
+ const std::string& version_name) { |
+ ClientVersion* version = new ClientVersion; |
+ |
+ version->set_client(client); |
+ version->set_version_name(version_name); |
+ |
+ return version; |
+} |
+ |
+// Wrapper for the http post constructor. This is the default way |
+// to contact the server, but it can be overridden for testing. |
+void SendHttpPost(net::URLRequestContextGetter* url_context_getter, |
+ const std::string& rpc_name, |
+ scoped_ptr<MessageLite> request_proto, |
+ const HttpPost::ResponseCallback& callback) { |
+ new HttpPost(url_context_getter, rpc_name, request_proto.Pass(), callback); |
+} |
+ |
+} // namespace |
+ |
+// Public methods |
+ |
+RpcHandler::RpcHandler(CopresenceClientDelegate* delegate) |
+ : delegate_(delegate), |
+ invalid_audio_token_cache_( |
+ base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), |
+ kMaxInvalidTokens), |
+ server_post_callback_(base::Bind(&SendHttpPost)) { |
+} |
+ |
+RpcHandler::~RpcHandler() {} |
+ |
+void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { |
+ scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); |
+ DCHECK(device_id_.empty()); |
+ device_id_ = delegate_->GetDeviceId(); |
+ if (!device_id_.empty()) { |
+ init_done_callback.Run(true); |
+ return; |
+ } |
+ |
+ request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); |
+ Identity* identity = |
+ request->mutable_device_identifiers()->mutable_registrant(); |
+ identity->set_type(CHROME); |
+ identity->set_chrome_id(base::GenerateGUID()); |
+ SendServerRequest( |
+ kRegisterDeviceRpcName, |
+ std::string(), |
+ request.Pass(), |
+ base::Bind(&RpcHandler::RegisterResponseHandler, |
+ AsWeakPtr(), |
+ init_done_callback)); |
+} |
+ |
+void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) { |
+ SendReportRequest(request.Pass(), std::string(), StatusCallback()); |
} |
-void RpcHandler::ReportTokens(copresence::TokenMedium medium, |
+void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, |
+ const std::string& app_id, |
+ const StatusCallback& status_callback) { |
+ DCHECK(request.get()); |
+ DCHECK(!device_id_.empty()) |
+ << "\nRpcHandler::Initialize() must complete successfully " |
Daniel Erat
2014/08/06 16:01:17
why do you have a newline at the beginning of this
Charlie
2014/08/06 19:32:19
I liked the log message better all on one line. Bu
|
+ << "before other RpcHandler methods are called."; |
+ |
+ DVLOG(3) << "Sending report request to server."; |
+ |
+ request->mutable_update_signals_request()->set_allocated_state( |
+ GetDeviceCapabilities(request.get())); |
+ SendServerRequest( |
+ kReportRequestRpcName, |
+ app_id, |
+ request.Pass(), |
+ base::Bind( |
+ &RpcHandler::ReportResponseHandler, AsWeakPtr(), status_callback)); |
+} |
+ |
+void RpcHandler::ReportTokens(TokenMedium medium, |
const std::vector<std::string>& tokens) { |
+ DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND); |
+ DCHECK(!tokens.empty()); |
+ |
+ scoped_ptr<ReportRequest> request(new ReportRequest); |
+ for (size_t i = 0; i < tokens.size(); ++i) { |
+ const std::string& token = ToUrlSafe(tokens[i]); |
+ if (invalid_audio_token_cache_.HasKey(token)) |
+ continue; |
+ |
+ DVLOG(3) << "Sending token " << token << " to server."; |
+ |
+ TokenObservation* token_observation = |
+ request->mutable_update_signals_request()->add_token_observation(); |
+ token_observation->set_token_id(token); |
+ |
+ TokenSignals* signals = token_observation->add_signals(); |
+ signals->set_medium(medium); |
+ signals->set_observed_time_millis(base::Time::Now().ToJsTime()); |
+ } |
+ SendReportRequest(request.Pass()); |
} |
-void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { |
+void RpcHandler::ConnectToWhispernet() { |
+ WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
+ |
+ // Directive_handler will be destructed before us, so unretained is safe. |
Daniel Erat
2014/08/06 16:01:17
s/Directive_handler/|directive_handler_|/
Charlie
2014/08/06 19:32:20
Done.
|
+ directive_handler_.reset(new DirectiveHandler( |
+ base::Bind(&WhispernetClient::DecodeSamples, |
+ base::Unretained(whispernet_client)), |
+ base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, |
+ base::Unretained(this)))); |
+ |
+ whispernet_client->RegisterTokensCallback( |
+ base::Bind(&RpcHandler::ReportTokens, |
+ base::Unretained(this), |
xiyuan
2014/08/05 21:33:39
AsWeakPtr()?
Charlie
2014/08/06 19:32:20
Done.
|
+ AUDIO_ULTRASOUND_PASSBAND)); |
} |
void RpcHandler::DisconnectFromWhispernet() { |
+ directive_handler_.reset(); |
+} |
+ |
+// Private methods |
+ |
+void RpcHandler::RegisterResponseHandler( |
+ const SuccessCallback& init_done_callback, |
+ int http_status_code, |
+ const std::string& response_data) { |
+ if (http_status_code != net::HTTP_OK) { |
+ init_done_callback.Run(false); |
+ return; |
+ } |
+ |
+ RegisterDeviceResponse response; |
+ if (!response.ParseFromString(response_data)) { |
+ LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; |
+ init_done_callback.Run(false); |
+ return; |
+ } |
+ |
+ if (!LogStatus(response.header().status())) |
+ return; |
+ device_id_ = response.registered_device_id(); |
+ DCHECK(!device_id_.empty()); |
+ DVLOG(2) << "Device registration successful: id " << device_id_; |
+ delegate_->SaveDeviceId(device_id_); |
+ init_done_callback.Run(true); |
+} |
+ |
+void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, |
+ int http_status_code, |
+ const std::string& response_data) { |
+ if (http_status_code != net::HTTP_OK) { |
+ if (!status_callback.is_null()) |
+ status_callback.Run(FAIL); |
+ return; |
+ } |
+ |
+ DVLOG(3) << "Received ReportResponse."; |
+ ReportResponse response; |
+ if (!response.ParseFromString(response_data)) { |
+ LOG(ERROR) << "Invalid ReportResponse"; |
+ if (!status_callback.is_null()) |
+ status_callback.Run(FAIL); |
+ return; |
+ } |
+ |
+ if (!LogStatus(response)) { |
+ if (!status_callback.is_null()) |
+ status_callback.Run(FAIL); |
+ return; |
+ } |
+ |
+ if (response.has_manage_messages_response()) { |
+ RepeatedPtrField<MessageResult> message_results = |
+ response.manage_messages_response().published_message_result(); |
+ for (int i = 0; i < message_results.size(); ++i) { |
+ DVLOG(2) << "Published message with id " |
+ << message_results.Get(i).published_message_id(); |
+ } |
+ } |
+ |
+ if (response.has_manage_subscriptions_response()) { |
+ RepeatedPtrField<SubscriptionResult> subscription_results = |
+ response.manage_subscriptions_response().subscription_result(); |
+ for (int i = 0; i < subscription_results.size(); ++i) { |
+ DVLOG(2) << "Created subscription with id " |
+ << subscription_results.Get(i).subscription_id(); |
+ } |
+ } |
+ |
+ if (response.has_update_signals_response()) { |
+ const UpdateSignalsResponse& update_response = |
+ response.update_signals_response(); |
+ DispatchMessages(update_response.message()); |
+ |
+ if (directive_handler_.get()) { |
+ for (int i = 0; i < update_response.directive_size(); ++i) |
+ directive_handler_->AddDirective(update_response.directive(i)); |
+ } else { |
+ DVLOG(1) << "No directive handler."; |
+ } |
+ |
+ RepeatedPtrField<Token> tokens = update_response.token(); |
+ for (int i = 0; i < tokens.size(); ++i) { |
+ switch (tokens.Get(i).status()) { |
+ case VALID: |
+ // TODO(rkc/ckehoe): Store the token in a valid_token_cache_ with a |
Daniel Erat
2014/08/06 16:01:16
nit: |valid_token_cache_|
Charlie
2014/08/06 19:32:19
Done.
|
+ // short ttl (like 10s) and send it up with every report request. |
Daniel Erat
2014/08/06 16:01:16
nit: s/ttl/TTL/
Charlie
2014/08/06 19:32:19
Done.
|
+ // Then we'll still get messages while we're waiting to hear it again. |
+ VLOG(1) << "Got valid token " << tokens.Get(i).id(); |
+ break; |
+ case INVALID: |
+ DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); |
+ invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); |
+ break; |
+ default: |
+ DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " |
+ << tokens.Get(i).status(); |
+ } |
+ } |
+ } |
+ |
+ // TODO(ckehoe): Return a more detailed status response. |
+ if (!status_callback.is_null()) |
+ status_callback.Run(SUCCESS); |
+} |
+ |
+void RpcHandler::DispatchMessages( |
+ const RepeatedPtrField<SubscribedMessage>& messages) { |
+ if (messages.size() == 0) |
+ return; |
+ |
+ // Index the messages by subscription id. |
+ std::map<std::string, std::vector<Message> > messages_by_subscription; |
+ DVLOG(3) << "Dispatching " << messages.size() << " messages"; |
+ for (int m = 0; m < messages.size(); ++m) { |
+ const RepeatedPtrField<std::string>& subscription_ids = |
+ messages.Get(m).subscription_id(); |
+ for (int s = 0; s < subscription_ids.size(); ++s) { |
+ messages_by_subscription[subscription_ids.Get(s)].push_back( |
+ messages.Get(m).published_message()); |
+ } |
+ } |
+ |
+ // Send the messages for each subscription. |
+ for (std::map<std::string, std::vector<Message> >::const_iterator |
+ subscription = messages_by_subscription.begin(); |
+ subscription != messages_by_subscription.end(); |
+ ++subscription) { |
+ // TODO(ckehoe): Once we have the app ID from the server, we need to pass |
+ // it in here and get rid of the app id registry from the main API class. |
+ delegate_->HandleMessages("", subscription->first, subscription->second); |
+ } |
+} |
+ |
+RequestHeader* RpcHandler::CreateRequestHeader( |
+ const std::string& client_name) const { |
+ RequestHeader* header = new RequestHeader; |
+ |
+ header->set_allocated_framework_version( |
+ CreateVersion("Chrome", delegate_->GetPlatformVersionString())); |
+ if (!client_name.empty()) { |
+ header->set_allocated_client_version( |
+ CreateVersion(client_name, std::string())); |
+ } |
+ header->set_current_time_millis(base::Time::Now().ToJsTime()); |
+ header->set_registered_device_id(device_id_); |
+ |
+ return header; |
+} |
+ |
+template <class T> |
+void RpcHandler::SendServerRequest( |
+ const std::string& rpc_name, |
+ const std::string& app_id, |
+ scoped_ptr<T> request, |
+ const HttpPost::ResponseCallback& response_handler) { |
+ request->set_allocated_header(CreateRequestHeader(app_id)); |
+ server_post_callback_.Run(delegate_->GetRequestContext(), |
+ rpc_name, |
+ make_scoped_ptr<MessageLite>(request.release()), |
+ response_handler); |
+} |
+ |
+void RpcHandler::AudioDirectiveListToWhispernetConnector( |
+ const std::string& token, |
+ const WhispernetClient::SamplesCallback& samples_callback) { |
+ WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); |
+ if (whispernet_client) { |
+ whispernet_client->RegisterSamplesCallback(samples_callback); |
+ whispernet_client->EncodeToken(token); |
+ } |
} |
} // namespace copresence |