Chromium Code Reviews| Index: components/proximity_auth/e2e_test/cryptauth.py |
| diff --git a/components/proximity_auth/e2e_test/cryptauth.py b/components/proximity_auth/e2e_test/cryptauth.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..020d73856693f1ed33cf600762f715184b477828 |
| --- /dev/null |
| +++ b/components/proximity_auth/e2e_test/cryptauth.py |
| @@ -0,0 +1,139 @@ |
| +# Copyright 2015 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +import httplib |
| +import json |
| +import logging |
| +import pprint |
| +import time |
| + |
| +logger = logging.getLogger('proximity_auth.%s' % __name__) |
| + |
| +_GOOGLE_APIS_URL = 'www.googleapis.com' |
| +_REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON' |
| + |
| +class CryptAuthClient(object): |
| + """ A client for making blocking CryptAuth API calls. """ |
| + |
| + def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL): |
| + self._access_token = access_token |
| + self._google_apis_url = google_apis_url |
| + |
| + def GetMyDevices(self): |
| + """ Invokes the GetMyDevices API. |
| + |
| + Returns: |
| + A list of devices or None if the API call fails. |
| + Each device is a dictionary of the deserialized JSON returned by |
| + CryptAuth. |
| + """ |
| + request_data = { |
| + 'approvedForUnlockRequired': False, |
| + 'allowStaleRead': False, |
| + 'invocationReason': 13 # REASON_MANUAL |
| + } |
| + response = self._SendRequest('deviceSync/getmydevices', request_data) |
| + return response['devices'] if response is not None else None |
| + |
| + def GetUnlockKey(self): |
| + """ |
| + Returns: |
| + The unlock key registered with CryptAuth if it exists or None. |
| + The device is a dictionary of the deserialized JSON returned by CryptAuth. |
| + """ |
| + devices = self.GetMyDevices() |
| + if devices is None: |
| + return None |
| + |
| + for device in devices: |
| + if device['unlockKey']: |
| + return device |
| + return None |
| + |
| + def ToggleEasyUnlock(self, enable, public_key=''): |
| + """ Calls the ToggleEasyUnlock API. |
| + Args: |
| + enable: True to designate the device specified by |public_key| as an |
| + unlock key. |
| + public_key: The public key of the device to toggle. Ignored if |enable| is |
| + False, which toggles all unlock keys off. |
| + Returns: |
| + True upon success, else False. |
| + """ |
| + request_data = { 'enable': enable, } |
| + if not enable: |
| + request_data['applyToAll'] = True |
| + else: |
| + request_data['publicKey'] = public_key |
| + response = self._SendRequest('deviceSync/toggleeasyunlock', request_data) |
| + return response is not None |
| + |
| + def FindEligibleUnlockDevices(self, time_delta_millis=None): |
| + """ Finds devices eligible to be an unlock key. |
| + Args: |
| + time_delta_millis: If specified, then only return eligible devices that |
| + have contacted CryptAuth in the last time delta. |
| + Returns: |
| + A tuple containing two lists, one of eligible devices and the other of |
| + ineligible devices. |
| + Each device is a dictionary of the deserialized JSON returned by |
| + CryptAuth. |
| + """ |
| + request_data = {} |
| + if time_delta_millis is not None: |
| + request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000; |
| + |
| + response = self._SendRequest( |
| + 'deviceSync/findeligibleunlockdevices', request_data) |
| + if response is None: |
| + return None |
| + eligibleDevices = response['eligibleDevices'] if \ |
| + 'eligibleDevices' in response else [] |
|
Ilya Sherman
2015/03/24 01:25:14
nit: I think this is normally wrapped like so:
Tim Song
2015/03/25 23:08:44
Done.
|
| + ineligibleDevices = response['ineligibleDevices'] if \ |
| + 'ineligibleDevices' in response else [] |
| + return eligibleDevices, ineligibleDevices |
| + |
| + def PingPhones(self, timeout_secs=10): |
| + """ Asks CryptAuth to ping registered phones and determine their status. |
| + Args: |
| + timeout_secs: The number of seconds to wait for phones to respond. |
| + Returns: |
| + A tuple containing two lists, one of eligible devices and the other of |
| + ineligible devices. |
| + Each device is a dictionary of the deserialized JSON returned by |
| + CryptAuth. |
| + """ |
| + response = self._SendRequest( |
| + 'deviceSync/senddevicesynctickle', |
| + { 'tickleType': 'updateEnrollment' }) |
| + if response is None: |
| + return None |
| + # We wait for phones to update their status with CryptAuth. |
| + logger.info('Waiting for %s seconds for phone status...' % timeout_secs) |
| + time.sleep(timeout_secs) |
| + return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs) |
| + |
| + def _SendRequest(self, function_path, request_data): |
| + """ Sends an HTTP request to CryptAuth and returns the deserialized |
| + response. |
| + """ |
| + conn = httplib.HTTPSConnection(self._google_apis_url) |
| + path = _REQUEST_PATH % function_path |
| + |
| + headers = { |
| + 'authorization': 'Bearer ' + self._access_token, |
| + 'Content-Type': 'application/json' |
| + } |
| + body = json.dumps(request_data) |
| + logger.info('Making request to %s with body:\n%s' % \ |
| + (path, pprint.pformat(request_data))) |
| + conn.request('POST', path, body, headers) |
| + |
| + response = conn.getresponse() |
| + if response.status == 204: |
| + return {} |
| + if response.status != 200: |
| + logger.warning('Request to %s failed: %s' % (path, response.status)) |
| + return None |
| + return json.loads(response.read()) |