Index: components/proximity_auth/e2e_test/cryptauth.py |
diff --git a/components/proximity_auth/e2e_test/cryptauth.py b/components/proximity_auth/e2e_test/cryptauth.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7373fa9fb8f2f6e498826766b3ee00b70af286b5 |
--- /dev/null |
+++ b/components/proximity_auth/e2e_test/cryptauth.py |
@@ -0,0 +1,140 @@ |
+# Copyright 2015 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import httplib |
+import json |
+import logging |
+import pprint |
+import time |
+ |
+logger = logging.getLogger('proximity_auth.%s' % __name__) |
+ |
+_GOOGLE_APIS_URL = 'www.googleapis.com' |
+_REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON' |
+ |
+class CryptAuthClient(object): |
+ """ A client for making blocking CryptAuth API calls. """ |
+ |
+ def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL): |
+ self._access_token = access_token |
+ self._google_apis_url = google_apis_url |
+ |
+ def GetMyDevices(self): |
+ """ Invokes the GetMyDevices API. |
+ |
+ Returns: |
+ A list of devices or None if the API call fails. |
+ Each device is a dictionary of the deserialized JSON returned by |
+ CryptAuth. |
+ """ |
+ request_data = { |
+ 'approvedForUnlockRequired': False, |
+ 'allowStaleRead': False, |
+ 'invocationReason': 13 # REASON_MANUAL |
+ } |
+ response = self._SendRequest('deviceSync/getmydevices', request_data) |
+ return response['devices'] if response is not None else None |
+ |
+ def GetUnlockKey(self): |
+ """ |
+ Returns: |
+ The unlock key registered with CryptAuth if it exists or None. |
+ The device is a dictionary of the deserialized JSON returned by CryptAuth. |
+ """ |
+ devices = self.GetMyDevices() |
+ if devices is None: |
+ return None |
+ |
+ for device in devices: |
+ if device['unlockKey']: |
+ return device |
+ return None |
+ |
+ def ToggleEasyUnlock(self, enable, public_key=''): |
+ """ Calls the ToggleEasyUnlock API. |
+ Args: |
+ enable: True to designate the device specified by |public_key| as an |
+ unlock key. |
+ public_key: The public key of the device to toggle. Ignored if |enable| is |
+ False, which toggles all unlock keys off. |
+ Returns: |
+ True upon success, else False. |
+ """ |
+ request_data = { 'enable': enable, } |
+ if not enable: |
+ request_data['applyToAll'] = True |
+ else: |
+ request_data['publicKey'] = public_key |
+ response = self._SendRequest('deviceSync/toggleeasyunlock', request_data) |
+ return response is not None |
+ |
+ def FindEligibleUnlockDevices(self, time_delta_millis=None): |
+ """ Finds devices eligible to be an unlock key. |
+ Args: |
+ time_delta_millis: If specified, then only return eligible devices that |
+ have contacted CryptAuth in the last time delta. |
+ Returns: |
+ A tuple containing two lists, one of eligible devices and the other of |
+ ineligible devices. |
+ Each device is a dictionary of the deserialized JSON returned by |
+ CryptAuth. |
+ """ |
+ request_data = {} |
+ if time_delta_millis is not None: |
+ request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000; |
+ |
+ response = self._SendRequest( |
+ 'deviceSync/findeligibleunlockdevices', request_data) |
+ if response is None: |
+ return None |
+ eligibleDevices = ( |
+ response['eligibleDevices'] if 'eligibleDevices' in response else []) |
+ ineligibleDevices = ( |
+ response['ineligibleDevices'] if ( |
+ 'ineligibleDevices' in response) else []) |
+ return eligibleDevices, ineligibleDevices |
+ |
+ def PingPhones(self, timeout_secs=10): |
+ """ Asks CryptAuth to ping registered phones and determine their status. |
+ Args: |
+ timeout_secs: The number of seconds to wait for phones to respond. |
+ Returns: |
+ A tuple containing two lists, one of eligible devices and the other of |
+ ineligible devices. |
+ Each device is a dictionary of the deserialized JSON returned by |
+ CryptAuth. |
+ """ |
+ response = self._SendRequest( |
+ 'deviceSync/senddevicesynctickle', |
+ { 'tickleType': 'updateEnrollment' }) |
+ if response is None: |
+ return None |
+ # We wait for phones to update their status with CryptAuth. |
+ logger.info('Waiting for %s seconds for phone status...' % timeout_secs) |
+ time.sleep(timeout_secs) |
+ return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs) |
+ |
+ def _SendRequest(self, function_path, request_data): |
+ """ Sends an HTTP request to CryptAuth and returns the deserialized |
+ response. |
+ """ |
+ conn = httplib.HTTPSConnection(self._google_apis_url) |
+ path = _REQUEST_PATH % function_path |
+ |
+ headers = { |
+ 'authorization': 'Bearer ' + self._access_token, |
+ 'Content-Type': 'application/json' |
+ } |
+ body = json.dumps(request_data) |
+ logger.info('Making request to %s with body:\n%s' % ( |
+ path, pprint.pformat(request_data))) |
+ conn.request('POST', path, body, headers) |
+ |
+ response = conn.getresponse() |
+ if response.status == 204: |
+ return {} |
+ if response.status != 200: |
+ logger.warning('Request to %s failed: %s' % (path, response.status)) |
+ return None |
+ return json.loads(response.read()) |