Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: components/copresence/rpc/rpc_handler.cc

Issue 433283002: Adding the Copresence RpcHandler and HttpPost helper. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@directive-handler
Patch Set: Rebasing off the correct CL Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license
3 // found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 #include "components/copresence/rpc/rpc_handler.h" 5 #include "components/copresence/rpc/rpc_handler.h"
6 6
7 #include <map>
8 #include <sstream>
9
7 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14 #include "base/time/time.h"
15 #include "components/copresence/handlers/directive_handler.h"
16 #include "components/copresence/proto/codes.pb.h"
8 #include "components/copresence/proto/data.pb.h" 17 #include "components/copresence/proto/data.pb.h"
9 #include "components/copresence/proto/rpcs.pb.h" 18 #include "components/copresence/proto/rpcs.pb.h"
10 #include "components/copresence/public/copresence_client_delegate.h" 19 #include "components/copresence/public/copresence_client_delegate.h"
11 #include "components/copresence/public/whispernet_client.h" 20 #include "net/http/http_status_code.h"
21
22 // TODO(ckehoe): Return error messages for bad requests.
12 23
13 namespace copresence { 24 namespace copresence {
14 25
15 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, 26 using google::protobuf::MessageLite;
16 SuccessCallback init_done_callback) { 27 using google::protobuf::RepeatedPtrField;
17 } 28
18 29 namespace {
19 RpcHandler::~RpcHandler() { 30
20 } 31 // UrlSafe is defined as:
21 32 // '/' represented by a '_' and '+' represented by a '-'
22 void RpcHandler::SendReportRequest( 33 // TODO(rkc): Move this to the wrapper.
Daniel Erat 2014/08/06 16:01:17 if this is duplicated now (i know i saw it earlier
Charlie 2014/08/06 19:32:19 I have his CLs patched in, but a grep of my tree d
Daniel Erat 2014/08/06 21:35:18 yeah, that's probably what i'm thinking of.
Charlie 2014/08/06 22:36:23 Acknowledged.
23 scoped_ptr<copresence::ReportRequest> request) { 34 std::string ToUrlSafe(std::string token) {
24 } 35 base::ReplaceChars(token, "+", "-", &token);
25 36 base::ReplaceChars(token, "/", "_", &token);
26 void RpcHandler::SendReportRequest( 37 return token;
27 scoped_ptr<copresence::ReportRequest> request, 38 }
39
40 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
41 const int kMaxInvalidTokens = 10000;
42
43 const char kRegisterDeviceRpcName[] = "registerdevice";
44 const char kReportRequestRpcName[] = "report";
Daniel Erat 2014/08/06 16:01:16 make this be a public static const member so it is
Charlie 2014/08/06 19:32:19 Done.
45
46 // Logging
47
48 // Logs the status and returns true if the status was OK.
Daniel Erat 2014/08/06 16:01:16 this looks like it only logs the status if it was
Charlie 2014/08/06 19:32:19 Done.
49 bool LogStatus(const Status& status) {
50 if (status.code() == OK)
51 return true;
52
53 std::stringstream log_message;
Daniel Erat 2014/08/06 16:01:16 the style guide says not to use streams except for
Charlie 2014/08/06 19:32:19 Sure. Done. On 2014/08/06 16:01:16, Daniel Erat w
54 log_message << "Copresence error code " << status.code();
55 if (!status.message().empty()) {
56 log_message << ": " << status.message();
57 }
58
59 LOG(ERROR) << log_message.str();
60 return false;
61 }
62
63 void LogStatus(const util::error::Code& code, const std::string& context) {
Daniel Erat 2014/08/06 16:01:16 don't overload function names
Charlie 2014/08/06 19:32:20 Done.
64 LOG_IF(ERROR, code != util::error::OK)
65 << context << " error " << code << ". See "
66 << "cs/google3/util/task/codes.proto for more info.";
67 }
68
69 // Logs the status and returns true if the status was OK.
70 bool LogStatus(const ReportResponse& response) {
Daniel Erat 2014/08/06 16:01:17 don't overload function names
Charlie 2014/08/06 19:32:19 Done.
71 bool result = LogStatus(response.header().status());
72 if (response.has_manage_messages_response()) {
Daniel Erat 2014/08/06 16:01:16 omit curly brackets for single-line statements
Charlie 2014/08/06 19:32:19 Done.
73 LogStatus(response.manage_messages_response().status(), "Publish");
74 }
75 if (response.has_manage_subscriptions_response()) {
76 LogStatus(response.manage_subscriptions_response().status(), "Subscribe");
77 }
78 if (response.has_update_signals_response()) {
79 LogStatus(response.update_signals_response().status(), "Update");
80 }
81 return result;
82 }
83
84 // Request construction
Daniel Erat 2014/08/06 16:01:16 nit: add trailing period
Charlie 2014/08/06 19:32:19 This is supposed to be a section header. Added a n
85 template <typename T>
86 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
87 if (msg.has_token_exchange_strategy() &&
88 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
89 return msg.token_exchange_strategy().broadcast_scan_configuration();
90 }
91 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
92 }
93
94 // This method will extract token exchange strategies
95 // from the publishes and subscribes in a report request.
96 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
97 ReportRequest* request) {
Daniel Erat 2014/08/06 16:01:17 can this be a const reference?
Charlie 2014/08/06 19:32:19 Yes. Done. On 2014/08/06 16:01:17, Daniel Erat wr
98 bool broadcast_only = false;
99 bool scan_only = false;
100
101 // Strategies for publishes.
102 if (request->has_manage_messages_request()) {
103 const RepeatedPtrField<PublishedMessage> messages =
104 request->manage_messages_request().message_to_publish();
105 for (int i = 0; i < messages.size(); ++i) {
106 BroadcastScanConfiguration config =
107 GetBroadcastScanConfig(messages.Get(i));
108 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
109 scan_only = scan_only || config == SCAN_ONLY;
110 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
111 return BROADCAST_AND_SCAN;
112 }
113 }
114
115 // Strategies for subscriptions.
116 if (request->has_manage_subscriptions_request()) {
117 const RepeatedPtrField<Subscription> messages =
118 request->manage_subscriptions_request().subscription();
119 for (int i = 0; i < messages.size(); ++i) {
120 BroadcastScanConfiguration config =
121 GetBroadcastScanConfig(messages.Get(i));
122 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
123 scan_only = scan_only || config == SCAN_ONLY;
124 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
125 return BROADCAST_AND_SCAN;
126 }
127 }
128
129 if (broadcast_only)
130 return BROADCAST_ONLY;
131 if (scan_only)
132 return SCAN_ONLY;
133 return BROADCAST_AND_SCAN;
Daniel Erat 2014/08/06 16:01:16 do you need to check whether both broadcast_only a
Charlie 2014/08/06 19:32:20 No, this is our default. Added a comment. On 2014
134 }
135
136 DeviceState* GetDeviceCapabilities(
Daniel Erat 2014/08/06 16:01:16 return a scoped_ptr instead so the ownership trans
Charlie 2014/08/06 19:32:19 Done.
137 ReportRequest* request) {
Daniel Erat 2014/08/06 16:01:16 can this be a const reference? also, unwrap this
Charlie 2014/08/06 19:32:19 Done.
138 DeviceState* state = new DeviceState;
139
140 TokenTechnology* token_technology =
141 state->mutable_capabilities()->add_token_technology();
142 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
143
144 BroadcastScanConfiguration config =
145 ExtractTokenExchangeStrategy(request);
146 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
147 token_technology->add_instruction_type(TRANSMIT);
148 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
149 token_technology->add_instruction_type(RECEIVE);
150
151 return state;
152 }
153
154 // TODO(ckehoe): We're keeping this code in a separate function for now
155 // because we get a version string from Chrome, but the proto expects
156 // an int64 version. We should probably change the version proto
157 // to handle a more detailed version.
158 ClientVersion* CreateVersion(const std::string& client,
159 const std::string& version_name) {
160 ClientVersion* version = new ClientVersion;
161
162 version->set_client(client);
163 version->set_version_name(version_name);
164
165 return version;
166 }
167
168 // Wrapper for the http post constructor. This is the default way
169 // to contact the server, but it can be overridden for testing.
170 void SendHttpPost(net::URLRequestContextGetter* url_context_getter,
171 const std::string& rpc_name,
172 scoped_ptr<MessageLite> request_proto,
173 const HttpPost::ResponseCallback& callback) {
174 new HttpPost(url_context_getter, rpc_name, request_proto.Pass(), callback);
175 }
176
177 } // namespace
178
179 // Public methods
180
181 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
182 : delegate_(delegate),
183 invalid_audio_token_cache_(
184 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
185 kMaxInvalidTokens),
186 server_post_callback_(base::Bind(&SendHttpPost)) {
187 }
188
189 RpcHandler::~RpcHandler() {}
190
191 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
192 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
193 DCHECK(device_id_.empty());
194 device_id_ = delegate_->GetDeviceId();
195 if (!device_id_.empty()) {
196 init_done_callback.Run(true);
197 return;
198 }
199
200 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
201 Identity* identity =
202 request->mutable_device_identifiers()->mutable_registrant();
203 identity->set_type(CHROME);
204 identity->set_chrome_id(base::GenerateGUID());
205 SendServerRequest(
206 kRegisterDeviceRpcName,
207 std::string(),
208 request.Pass(),
209 base::Bind(&RpcHandler::RegisterResponseHandler,
210 AsWeakPtr(),
211 init_done_callback));
212 }
213
214 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
215 SendReportRequest(request.Pass(), std::string(), StatusCallback());
216 }
217
218 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
219 const std::string& app_id,
220 const StatusCallback& status_callback) {
221 DCHECK(request.get());
222 DCHECK(!device_id_.empty())
223 << "\nRpcHandler::Initialize() must complete successfully "
Daniel Erat 2014/08/06 16:01:17 why do you have a newline at the beginning of this
Charlie 2014/08/06 19:32:19 I liked the log message better all on one line. Bu
224 << "before other RpcHandler methods are called.";
225
226 DVLOG(3) << "Sending report request to server.";
227
228 request->mutable_update_signals_request()->set_allocated_state(
229 GetDeviceCapabilities(request.get()));
230 SendServerRequest(
231 kReportRequestRpcName,
232 app_id,
233 request.Pass(),
234 base::Bind(
235 &RpcHandler::ReportResponseHandler, AsWeakPtr(), status_callback));
236 }
237
238 void RpcHandler::ReportTokens(TokenMedium medium,
239 const std::vector<std::string>& tokens) {
240 DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND);
241 DCHECK(!tokens.empty());
242
243 scoped_ptr<ReportRequest> request(new ReportRequest);
244 for (size_t i = 0; i < tokens.size(); ++i) {
245 const std::string& token = ToUrlSafe(tokens[i]);
246 if (invalid_audio_token_cache_.HasKey(token))
247 continue;
248
249 DVLOG(3) << "Sending token " << token << " to server.";
250
251 TokenObservation* token_observation =
252 request->mutable_update_signals_request()->add_token_observation();
253 token_observation->set_token_id(token);
254
255 TokenSignals* signals = token_observation->add_signals();
256 signals->set_medium(medium);
257 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
258 }
259 SendReportRequest(request.Pass());
260 }
261
262 void RpcHandler::ConnectToWhispernet() {
263 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
264
265 // Directive_handler will be destructed before us, so unretained is safe.
Daniel Erat 2014/08/06 16:01:17 s/Directive_handler/|directive_handler_|/
Charlie 2014/08/06 19:32:20 Done.
266 directive_handler_.reset(new DirectiveHandler(
267 base::Bind(&WhispernetClient::DecodeSamples,
268 base::Unretained(whispernet_client)),
269 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
270 base::Unretained(this))));
271
272 whispernet_client->RegisterTokensCallback(
273 base::Bind(&RpcHandler::ReportTokens,
274 base::Unretained(this),
xiyuan 2014/08/05 21:33:39 AsWeakPtr()?
Charlie 2014/08/06 19:32:20 Done.
275 AUDIO_ULTRASOUND_PASSBAND));
276 }
277
278 void RpcHandler::DisconnectFromWhispernet() {
279 directive_handler_.reset();
280 }
281
282 // Private methods
283
284 void RpcHandler::RegisterResponseHandler(
285 const SuccessCallback& init_done_callback,
286 int http_status_code,
287 const std::string& response_data) {
288 if (http_status_code != net::HTTP_OK) {
289 init_done_callback.Run(false);
290 return;
291 }
292
293 RegisterDeviceResponse response;
294 if (!response.ParseFromString(response_data)) {
295 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
296 init_done_callback.Run(false);
297 return;
298 }
299
300 if (!LogStatus(response.header().status()))
301 return;
302 device_id_ = response.registered_device_id();
303 DCHECK(!device_id_.empty());
304 DVLOG(2) << "Device registration successful: id " << device_id_;
305 delegate_->SaveDeviceId(device_id_);
306 init_done_callback.Run(true);
307 }
308
309 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
310 int http_status_code,
311 const std::string& response_data) {
312 if (http_status_code != net::HTTP_OK) {
313 if (!status_callback.is_null())
314 status_callback.Run(FAIL);
315 return;
316 }
317
318 DVLOG(3) << "Received ReportResponse.";
319 ReportResponse response;
320 if (!response.ParseFromString(response_data)) {
321 LOG(ERROR) << "Invalid ReportResponse";
322 if (!status_callback.is_null())
323 status_callback.Run(FAIL);
324 return;
325 }
326
327 if (!LogStatus(response)) {
328 if (!status_callback.is_null())
329 status_callback.Run(FAIL);
330 return;
331 }
332
333 if (response.has_manage_messages_response()) {
334 RepeatedPtrField<MessageResult> message_results =
335 response.manage_messages_response().published_message_result();
336 for (int i = 0; i < message_results.size(); ++i) {
337 DVLOG(2) << "Published message with id "
338 << message_results.Get(i).published_message_id();
339 }
340 }
341
342 if (response.has_manage_subscriptions_response()) {
343 RepeatedPtrField<SubscriptionResult> subscription_results =
344 response.manage_subscriptions_response().subscription_result();
345 for (int i = 0; i < subscription_results.size(); ++i) {
346 DVLOG(2) << "Created subscription with id "
347 << subscription_results.Get(i).subscription_id();
348 }
349 }
350
351 if (response.has_update_signals_response()) {
352 const UpdateSignalsResponse& update_response =
353 response.update_signals_response();
354 DispatchMessages(update_response.message());
355
356 if (directive_handler_.get()) {
357 for (int i = 0; i < update_response.directive_size(); ++i)
358 directive_handler_->AddDirective(update_response.directive(i));
359 } else {
360 DVLOG(1) << "No directive handler.";
361 }
362
363 RepeatedPtrField<Token> tokens = update_response.token();
364 for (int i = 0; i < tokens.size(); ++i) {
365 switch (tokens.Get(i).status()) {
366 case VALID:
367 // TODO(rkc/ckehoe): Store the token in a valid_token_cache_ with a
Daniel Erat 2014/08/06 16:01:16 nit: |valid_token_cache_|
Charlie 2014/08/06 19:32:19 Done.
368 // short ttl (like 10s) and send it up with every report request.
Daniel Erat 2014/08/06 16:01:16 nit: s/ttl/TTL/
Charlie 2014/08/06 19:32:19 Done.
369 // Then we'll still get messages while we're waiting to hear it again.
370 VLOG(1) << "Got valid token " << tokens.Get(i).id();
371 break;
372 case INVALID:
373 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
374 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
375 break;
376 default:
377 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
378 << tokens.Get(i).status();
379 }
380 }
381 }
382
383 // TODO(ckehoe): Return a more detailed status response.
384 if (!status_callback.is_null())
385 status_callback.Run(SUCCESS);
386 }
387
388 void RpcHandler::DispatchMessages(
389 const RepeatedPtrField<SubscribedMessage>& messages) {
390 if (messages.size() == 0)
391 return;
392
393 // Index the messages by subscription id.
394 std::map<std::string, std::vector<Message> > messages_by_subscription;
395 DVLOG(3) << "Dispatching " << messages.size() << " messages";
396 for (int m = 0; m < messages.size(); ++m) {
397 const RepeatedPtrField<std::string>& subscription_ids =
398 messages.Get(m).subscription_id();
399 for (int s = 0; s < subscription_ids.size(); ++s) {
400 messages_by_subscription[subscription_ids.Get(s)].push_back(
401 messages.Get(m).published_message());
402 }
403 }
404
405 // Send the messages for each subscription.
406 for (std::map<std::string, std::vector<Message> >::const_iterator
407 subscription = messages_by_subscription.begin();
408 subscription != messages_by_subscription.end();
409 ++subscription) {
410 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
411 // it in here and get rid of the app id registry from the main API class.
412 delegate_->HandleMessages("", subscription->first, subscription->second);
413 }
414 }
415
416 RequestHeader* RpcHandler::CreateRequestHeader(
417 const std::string& client_name) const {
418 RequestHeader* header = new RequestHeader;
419
420 header->set_allocated_framework_version(
421 CreateVersion("Chrome", delegate_->GetPlatformVersionString()));
422 if (!client_name.empty()) {
423 header->set_allocated_client_version(
424 CreateVersion(client_name, std::string()));
425 }
426 header->set_current_time_millis(base::Time::Now().ToJsTime());
427 header->set_registered_device_id(device_id_);
428
429 return header;
430 }
431
432 template <class T>
433 void RpcHandler::SendServerRequest(
434 const std::string& rpc_name,
28 const std::string& app_id, 435 const std::string& app_id,
29 const StatusCallback& status_callback) { 436 scoped_ptr<T> request,
30 } 437 const HttpPost::ResponseCallback& response_handler) {
31 438 request->set_allocated_header(CreateRequestHeader(app_id));
32 void RpcHandler::ReportTokens(copresence::TokenMedium medium, 439 server_post_callback_.Run(delegate_->GetRequestContext(),
33 const std::vector<std::string>& tokens) { 440 rpc_name,
34 } 441 make_scoped_ptr<MessageLite>(request.release()),
35 442 response_handler);
36 void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { 443 }
37 } 444
38 445 void RpcHandler::AudioDirectiveListToWhispernetConnector(
39 void RpcHandler::DisconnectFromWhispernet() { 446 const std::string& token,
447 const WhispernetClient::SamplesCallback& samples_callback) {
448 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
449 if (whispernet_client) {
450 whispernet_client->RegisterSamplesCallback(samples_callback);
451 whispernet_client->EncodeToken(token);
452 }
40 } 453 }
41 454
42 } // namespace copresence 455 } // namespace copresence
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698