Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(268)

Side by Side Diff: components/copresence/rpc/rpc_handler.cc

Issue 433283002: Adding the Copresence RpcHandler and HttpPost helper. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@directive-handler
Patch Set: RpcHandler deletes HttpPosts Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/copresence/rpc/rpc_handler.h" 5 #include "components/copresence/rpc/rpc_handler.h"
6 6
7 #include <map>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
14 #include "base/time/time.h"
15 #include "components/copresence/copresence_switches.h"
16 #include "components/copresence/handlers/directive_handler.h"
17 #include "components/copresence/proto/codes.pb.h"
8 #include "components/copresence/proto/data.pb.h" 18 #include "components/copresence/proto/data.pb.h"
9 #include "components/copresence/proto/rpcs.pb.h" 19 #include "components/copresence/proto/rpcs.pb.h"
10 #include "components/copresence/public/copresence_client_delegate.h" 20 #include "components/copresence/public/copresence_client_delegate.h"
11 #include "components/copresence/public/whispernet_client.h" 21 #include "net/http/http_status_code.h"
22
23 // TODO(ckehoe): Return error messages for bad requests.
12 24
13 namespace copresence { 25 namespace copresence {
14 26
15 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate, 27 using google::protobuf::MessageLite;
16 SuccessCallback init_done_callback) { 28 using google::protobuf::RepeatedPtrField;
17 } 29
30 const char RpcHandler::kReportRequestRpcName[] = "report";
31
32 namespace {
33
34 // UrlSafe is defined as:
35 // '/' represented by a '_' and '+' represented by a '-'
36 // TODO(rkc): Move this to the wrapper.
37 std::string ToUrlSafe(std::string token) {
38 base::ReplaceChars(token, "+", "-", &token);
39 base::ReplaceChars(token, "/", "_", &token);
40 return token;
41 }
42
43 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
44 const int kMaxInvalidTokens = 10000;
45 const char kRegisterDeviceRpcName[] = "registerdevice";
46 const char kDefaultCopresenceServer[] =
47 "https://www.googleapis.com/copresence/v2/copresence";
48
49 // Logging
50
51 // Checks for a copresence error. If there is one, logs it and returns true.
52 bool CopresenceErrorLogged(const Status& status) {
53 if (status.code() != OK) {
54 LOG(ERROR) << "Copresence error code " << status.code()
55 << (status.message().empty() ? std::string() :
56 ": " + status.message());
57 }
58 return status.code() != OK;
59 }
60
61 void LogIfErrorStatus(const util::error::Code& code,
62 const std::string& context) {
63 LOG_IF(ERROR, code != util::error::OK)
64 << context << " error " << code << ". See "
65 << "cs/google3/util/task/codes.proto for more info.";
66 }
67
68 // If any errors occurred, logs them and returns true.
69 bool ReportErrorLogged(const ReportResponse& response) {
70 bool result = CopresenceErrorLogged(response.header().status());
71
72 // The Report fails or succeeds as a unit. If any responses had errors,
73 // the header will too. Thus we don't need to propagate individual errors.
74 if (response.has_update_signals_response())
75 LogIfErrorStatus(response.update_signals_response().status(), "Update");
76 if (response.has_manage_messages_response())
77 LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
78 if (response.has_manage_subscriptions_response()) {
79 LogIfErrorStatus(response.manage_subscriptions_response().status(),
80 "Subscribe");
81 }
82
83 return result;
84 }
85
86 // Request construction
87
88 template <typename T>
89 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
90 if (msg.has_token_exchange_strategy() &&
91 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
92 return msg.token_exchange_strategy().broadcast_scan_configuration();
93 }
94 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
95 }
96
97 // This method will extract token exchange strategies
98 // from the publishes and subscribes in a report request.
99 // TODO(ckehoe): Delete this when the server supports
100 // BroadcastScanConfiguration.
101 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
102 const ReportRequest& request) {
103 bool broadcast_only = false;
104 bool scan_only = false;
105
106 // Strategies for publishes.
107 if (request.has_manage_messages_request()) {
108 const RepeatedPtrField<PublishedMessage> messages =
109 request.manage_messages_request().message_to_publish();
110 for (int i = 0; i < messages.size(); ++i) {
111 BroadcastScanConfiguration config =
112 GetBroadcastScanConfig(messages.Get(i));
113 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
114 scan_only = scan_only || config == SCAN_ONLY;
115 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
116 return BROADCAST_AND_SCAN;
117 }
118 }
119
120 // Strategies for subscriptions.
121 if (request.has_manage_subscriptions_request()) {
122 const RepeatedPtrField<Subscription> messages =
123 request.manage_subscriptions_request().subscription();
124 for (int i = 0; i < messages.size(); ++i) {
125 BroadcastScanConfiguration config =
126 GetBroadcastScanConfig(messages.Get(i));
127 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
128 scan_only = scan_only || config == SCAN_ONLY;
129 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
130 return BROADCAST_AND_SCAN;
131 }
132 }
133
134 if (broadcast_only)
135 return BROADCAST_ONLY;
136 if (scan_only)
137 return SCAN_ONLY;
138
139 // If nothing else is specified, default to both broadcast and scan.
140 return BROADCAST_AND_SCAN;
141 }
142
143 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
144 scoped_ptr<DeviceState> state(new DeviceState);
145
146 TokenTechnology* token_technology =
147 state->mutable_capabilities()->add_token_technology();
148 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
149
150 BroadcastScanConfiguration config =
151 ExtractTokenExchangeStrategy(request);
152 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
153 token_technology->add_instruction_type(TRANSMIT);
154 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
155 token_technology->add_instruction_type(RECEIVE);
156
157 return state.Pass();
158 }
159
160 // TODO(ckehoe): We're keeping this code in a separate function for now
161 // because we get a version string from Chrome, but the proto expects
162 // an int64 version. We should probably change the version proto
163 // to handle a more detailed version.
164 ClientVersion* CreateVersion(const std::string& client,
165 const std::string& version_name) {
166 ClientVersion* version = new ClientVersion;
167
168 version->set_client(client);
169 version->set_version_name(version_name);
170
171 return version;
172 }
173
174 } // namespace
175
176 // Public methods
177
178 RpcHandler::RpcHandler(CopresenceClientDelegate* delegate)
179 : delegate_(delegate),
180 invalid_audio_token_cache_(
181 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
182 kMaxInvalidTokens),
183 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
184 base::Unretained(this))) {}
18 185
19 RpcHandler::~RpcHandler() { 186 RpcHandler::~RpcHandler() {
20 } 187 for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
21 188 post != pending_posts_.end(); ++post) {
22 void RpcHandler::SendReportRequest( 189 delete *post;
23 scoped_ptr<copresence::ReportRequest> request) { 190 }
24 } 191
25 192 if (delegate_ && delegate_->GetWhispernetClient()) {
26 void RpcHandler::SendReportRequest( 193 delegate_->GetWhispernetClient()->RegisterTokensCallback(
27 scoped_ptr<copresence::ReportRequest> request, 194 WhispernetClient::TokensCallback());
195 }
196 }
197
198 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
199 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
200 DCHECK(device_id_.empty());
201 device_id_ = delegate_->GetDeviceId();
202 if (!device_id_.empty()) {
203 init_done_callback.Run(true);
204 return;
205 }
206
207 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
208 Identity* identity =
209 request->mutable_device_identifiers()->mutable_registrant();
210 identity->set_type(CHROME);
211 identity->set_chrome_id(base::GenerateGUID());
212 SendServerRequest(
213 kRegisterDeviceRpcName,
214 std::string(),
215 request.Pass(),
216 base::Bind(&RpcHandler::RegisterResponseHandler,
217 // On delete, this request will be cancelled.
rkc 2014/08/07 23:41:18 nit: s/delete/destruction
Charlie 2014/08/07 23:57:11 Done.
218 base::Unretained(this),
219 init_done_callback));
220 }
221
222 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
223 SendReportRequest(request.Pass(), std::string(), StatusCallback());
224 }
225
226 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
227 const std::string& app_id,
228 const StatusCallback& status_callback) {
229 DCHECK(request.get());
230 DCHECK(!device_id_.empty())
231 << "RpcHandler::Initialize() must complete successfully "
232 << "before other RpcHandler methods are called.";
233
234 DVLOG(3) << "Sending report request to server.";
235
236 request->mutable_update_signals_request()->set_allocated_state(
237 GetDeviceCapabilities(*request).release());
238 SendServerRequest(kReportRequestRpcName,
239 app_id,
240 request.Pass(),
241 base::Bind(&RpcHandler::ReportResponseHandler,
242 // On delete, this request will be cancelled.
rkc 2014/08/07 23:41:18 nit: ditto
Charlie 2014/08/07 23:57:11 Done.
243 base::Unretained(this),
244 status_callback));
245 }
246
247 void RpcHandler::ReportTokens(TokenMedium medium,
248 const std::vector<std::string>& tokens) {
249 DCHECK_EQ(medium, AUDIO_ULTRASOUND_PASSBAND);
250 DCHECK(!tokens.empty());
251
252 scoped_ptr<ReportRequest> request(new ReportRequest);
253 for (size_t i = 0; i < tokens.size(); ++i) {
254 const std::string& token = ToUrlSafe(tokens[i]);
255 if (invalid_audio_token_cache_.HasKey(token))
256 continue;
257
258 DVLOG(3) << "Sending token " << token << " to server.";
259
260 TokenObservation* token_observation =
261 request->mutable_update_signals_request()->add_token_observation();
262 token_observation->set_token_id(token);
263
264 TokenSignals* signals = token_observation->add_signals();
265 signals->set_medium(medium);
266 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
267 }
268 SendReportRequest(request.Pass());
269 }
270
271 void RpcHandler::ConnectToWhispernet() {
272 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
273
274 // |directive_handler_| will be destructed with us, so unretained is safe.
275 directive_handler_.reset(new DirectiveHandler);
276 directive_handler_->Initialize(
277 base::Bind(&WhispernetClient::DecodeSamples,
278 base::Unretained(whispernet_client)),
279 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
280 base::Unretained(this)));
281
282 whispernet_client->RegisterTokensCallback(
283 base::Bind(&RpcHandler::ReportTokens,
284 // On delete, this callback will be disconnected.
285 base::Unretained(this),
286 AUDIO_ULTRASOUND_PASSBAND));
287 }
288
289 // Private methods
290
291 void RpcHandler::RegisterResponseHandler(
292 const SuccessCallback& init_done_callback,
293 int http_status_code,
294 const std::string& response_data,
295 HttpPost* completed_post) {
296 if (completed_post) {
297 DCHECK(pending_posts_.erase(completed_post));
298 delete completed_post;
299 }
300
301 if (http_status_code != net::HTTP_OK) {
302 init_done_callback.Run(false);
303 return;
304 }
305
306 RegisterDeviceResponse response;
307 if (!response.ParseFromString(response_data)) {
308 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
309 init_done_callback.Run(false);
310 return;
311 }
312
313 if (CopresenceErrorLogged(response.header().status()))
314 return;
315 device_id_ = response.registered_device_id();
316 DCHECK(!device_id_.empty());
317 DVLOG(2) << "Device registration successful: id " << device_id_;
318 delegate_->SaveDeviceId(device_id_);
319 init_done_callback.Run(true);
320 }
321
322 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
323 int http_status_code,
324 const std::string& response_data,
325 HttpPost* completed_post) {
326 if (completed_post) {
327 DCHECK(pending_posts_.erase(completed_post));
328 delete completed_post;
329 }
330
331 if (http_status_code != net::HTTP_OK) {
332 if (!status_callback.is_null())
333 status_callback.Run(FAIL);
334 return;
335 }
336
337 DVLOG(3) << "Received ReportResponse.";
338 ReportResponse response;
339 if (!response.ParseFromString(response_data)) {
340 LOG(ERROR) << "Invalid ReportResponse";
341 if (!status_callback.is_null())
342 status_callback.Run(FAIL);
343 return;
344 }
345
346 if (ReportErrorLogged(response)) {
347 if (!status_callback.is_null())
348 status_callback.Run(FAIL);
349 return;
350 }
351
352 const RepeatedPtrField<MessageResult>& message_results =
353 response.manage_messages_response().published_message_result();
354 for (int i = 0; i < message_results.size(); ++i) {
355 DVLOG(2) << "Published message with id "
356 << message_results.Get(i).published_message_id();
357 }
358
359 const RepeatedPtrField<SubscriptionResult>& subscription_results =
360 response.manage_subscriptions_response().subscription_result();
361 for (int i = 0; i < subscription_results.size(); ++i) {
362 DVLOG(2) << "Created subscription with id "
363 << subscription_results.Get(i).subscription_id();
364 }
365
366 if (response.has_update_signals_response()) {
367 const UpdateSignalsResponse& update_response =
368 response.update_signals_response();
369 DispatchMessages(update_response.message());
370
371 if (directive_handler_.get()) {
372 for (int i = 0; i < update_response.directive_size(); ++i)
373 directive_handler_->AddDirective(update_response.directive(i));
374 } else {
375 DVLOG(1) << "No directive handler.";
376 }
377
378 const RepeatedPtrField<Token>& tokens = update_response.token();
379 for (int i = 0; i < tokens.size(); ++i) {
380 switch (tokens.Get(i).status()) {
381 case VALID:
382 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
383 // short TTL (like 10s) and send it up with every report request.
384 // Then we'll still get messages while we're waiting to hear it again.
385 VLOG(1) << "Got valid token " << tokens.Get(i).id();
386 break;
387 case INVALID:
388 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
389 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
390 break;
391 default:
392 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
393 << tokens.Get(i).status();
394 }
395 }
396 }
397
398 // TODO(ckehoe): Return a more detailed status response.
399 if (!status_callback.is_null())
400 status_callback.Run(SUCCESS);
401 }
402
403 void RpcHandler::DispatchMessages(
404 const RepeatedPtrField<SubscribedMessage>& messages) {
405 if (messages.size() == 0)
406 return;
407
408 // Index the messages by subscription id.
409 std::map<std::string, std::vector<Message> > messages_by_subscription;
410 DVLOG(3) << "Dispatching " << messages.size() << " messages";
411 for (int m = 0; m < messages.size(); ++m) {
412 const RepeatedPtrField<std::string>& subscription_ids =
413 messages.Get(m).subscription_id();
414 for (int s = 0; s < subscription_ids.size(); ++s) {
415 messages_by_subscription[subscription_ids.Get(s)].push_back(
416 messages.Get(m).published_message());
417 }
418 }
419
420 // Send the messages for each subscription.
421 for (std::map<std::string, std::vector<Message> >::const_iterator
422 subscription = messages_by_subscription.begin();
423 subscription != messages_by_subscription.end();
424 ++subscription) {
425 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
426 // it in here and get rid of the app id registry from the main API class.
427 delegate_->HandleMessages("", subscription->first, subscription->second);
428 }
429 }
430
431 RequestHeader* RpcHandler::CreateRequestHeader(
432 const std::string& client_name) const {
433 RequestHeader* header = new RequestHeader;
434
435 header->set_allocated_framework_version(
436 CreateVersion("Chrome", delegate_->GetPlatformVersionString()));
437 if (!client_name.empty()) {
438 header->set_allocated_client_version(
439 CreateVersion(client_name, std::string()));
440 }
441 header->set_current_time_millis(base::Time::Now().ToJsTime());
442 header->set_registered_device_id(device_id_);
443
444 return header;
445 }
446
447 template <class T>
448 void RpcHandler::SendServerRequest(
449 const std::string& rpc_name,
28 const std::string& app_id, 450 const std::string& app_id,
29 const StatusCallback& status_callback) { 451 scoped_ptr<T> request,
30 } 452 const HttpPost::ResponseCallback& response_handler) {
31 453 request->set_allocated_header(CreateRequestHeader(app_id));
32 void RpcHandler::ReportTokens(copresence::TokenMedium medium, 454 server_post_callback_.Run(delegate_->GetRequestContext(),
33 const std::vector<std::string>& tokens) { 455 rpc_name,
34 } 456 make_scoped_ptr<MessageLite>(request.release()),
35 457 response_handler);
36 void RpcHandler::ConnectToWhispernet(WhispernetClient* whispernet_client) { 458 }
37 } 459
38 460 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
39 void RpcHandler::DisconnectFromWhispernet() { 461 const std::string& rpc_name,
462 scoped_ptr<MessageLite> request_proto,
463 const HttpPost::ResponseCallback& callback) {
464 // Create the base URL to call.
465 CommandLine* command_line = CommandLine::ForCurrentProcess();
466 const std::string copresence_server_host =
467 command_line->HasSwitch(switches::kCopresenceServer) ?
468 command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
469 kDefaultCopresenceServer;
470
471 // Create the request and keep a pointer until it completes.
472 pending_posts_.insert(new HttpPost(url_context_getter,
473 copresence_server_host,
474 rpc_name,
475 *request_proto,
476 callback));
477 }
478
479 void RpcHandler::AudioDirectiveListToWhispernetConnector(
480 const std::string& token,
481 const WhispernetClient::SamplesCallback& samples_callback) {
482 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
483 if (whispernet_client) {
484 whispernet_client->RegisterSamplesCallback(samples_callback);
485 whispernet_client->EncodeToken(token);
486 }
40 } 487 }
41 488
42 } // namespace copresence 489 } // namespace copresence
OLDNEW
« no previous file with comments | « components/copresence/rpc/rpc_handler.h ('k') | components/copresence/rpc/rpc_handler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698