| Index: components/proximity_auth/e2e_test/cryptauth.py
|
| diff --git a/components/proximity_auth/e2e_test/cryptauth.py b/components/proximity_auth/e2e_test/cryptauth.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7373fa9fb8f2f6e498826766b3ee00b70af286b5
|
| --- /dev/null
|
| +++ b/components/proximity_auth/e2e_test/cryptauth.py
|
| @@ -0,0 +1,140 @@
|
| +# Copyright 2015 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import httplib
|
| +import json
|
| +import logging
|
| +import pprint
|
| +import time
|
| +
|
| +logger = logging.getLogger('proximity_auth.%s' % __name__)
|
| +
|
| +_GOOGLE_APIS_URL = 'www.googleapis.com'
|
| +_REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON'
|
| +
|
| +class CryptAuthClient(object):
|
| + """ A client for making blocking CryptAuth API calls. """
|
| +
|
| + def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL):
|
| + self._access_token = access_token
|
| + self._google_apis_url = google_apis_url
|
| +
|
| + def GetMyDevices(self):
|
| + """ Invokes the GetMyDevices API.
|
| +
|
| + Returns:
|
| + A list of devices or None if the API call fails.
|
| + Each device is a dictionary of the deserialized JSON returned by
|
| + CryptAuth.
|
| + """
|
| + request_data = {
|
| + 'approvedForUnlockRequired': False,
|
| + 'allowStaleRead': False,
|
| + 'invocationReason': 13 # REASON_MANUAL
|
| + }
|
| + response = self._SendRequest('deviceSync/getmydevices', request_data)
|
| + return response['devices'] if response is not None else None
|
| +
|
| + def GetUnlockKey(self):
|
| + """
|
| + Returns:
|
| + The unlock key registered with CryptAuth if it exists or None.
|
| + The device is a dictionary of the deserialized JSON returned by CryptAuth.
|
| + """
|
| + devices = self.GetMyDevices()
|
| + if devices is None:
|
| + return None
|
| +
|
| + for device in devices:
|
| + if device['unlockKey']:
|
| + return device
|
| + return None
|
| +
|
| + def ToggleEasyUnlock(self, enable, public_key=''):
|
| + """ Calls the ToggleEasyUnlock API.
|
| + Args:
|
| + enable: True to designate the device specified by |public_key| as an
|
| + unlock key.
|
| + public_key: The public key of the device to toggle. Ignored if |enable| is
|
| + False, which toggles all unlock keys off.
|
| + Returns:
|
| + True upon success, else False.
|
| + """
|
| + request_data = { 'enable': enable, }
|
| + if not enable:
|
| + request_data['applyToAll'] = True
|
| + else:
|
| + request_data['publicKey'] = public_key
|
| + response = self._SendRequest('deviceSync/toggleeasyunlock', request_data)
|
| + return response is not None
|
| +
|
| + def FindEligibleUnlockDevices(self, time_delta_millis=None):
|
| + """ Finds devices eligible to be an unlock key.
|
| + Args:
|
| + time_delta_millis: If specified, then only return eligible devices that
|
| + have contacted CryptAuth in the last time delta.
|
| + Returns:
|
| + A tuple containing two lists, one of eligible devices and the other of
|
| + ineligible devices.
|
| + Each device is a dictionary of the deserialized JSON returned by
|
| + CryptAuth.
|
| + """
|
| + request_data = {}
|
| + if time_delta_millis is not None:
|
| + request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000;
|
| +
|
| + response = self._SendRequest(
|
| + 'deviceSync/findeligibleunlockdevices', request_data)
|
| + if response is None:
|
| + return None
|
| + eligibleDevices = (
|
| + response['eligibleDevices'] if 'eligibleDevices' in response else [])
|
| + ineligibleDevices = (
|
| + response['ineligibleDevices'] if (
|
| + 'ineligibleDevices' in response) else [])
|
| + return eligibleDevices, ineligibleDevices
|
| +
|
| + def PingPhones(self, timeout_secs=10):
|
| + """ Asks CryptAuth to ping registered phones and determine their status.
|
| + Args:
|
| + timeout_secs: The number of seconds to wait for phones to respond.
|
| + Returns:
|
| + A tuple containing two lists, one of eligible devices and the other of
|
| + ineligible devices.
|
| + Each device is a dictionary of the deserialized JSON returned by
|
| + CryptAuth.
|
| + """
|
| + response = self._SendRequest(
|
| + 'deviceSync/senddevicesynctickle',
|
| + { 'tickleType': 'updateEnrollment' })
|
| + if response is None:
|
| + return None
|
| + # We wait for phones to update their status with CryptAuth.
|
| + logger.info('Waiting for %s seconds for phone status...' % timeout_secs)
|
| + time.sleep(timeout_secs)
|
| + return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs)
|
| +
|
| + def _SendRequest(self, function_path, request_data):
|
| + """ Sends an HTTP request to CryptAuth and returns the deserialized
|
| + response.
|
| + """
|
| + conn = httplib.HTTPSConnection(self._google_apis_url)
|
| + path = _REQUEST_PATH % function_path
|
| +
|
| + headers = {
|
| + 'authorization': 'Bearer ' + self._access_token,
|
| + 'Content-Type': 'application/json'
|
| + }
|
| + body = json.dumps(request_data)
|
| + logger.info('Making request to %s with body:\n%s' % (
|
| + path, pprint.pformat(request_data)))
|
| + conn.request('POST', path, body, headers)
|
| +
|
| + response = conn.getresponse()
|
| + if response.status == 204:
|
| + return {}
|
| + if response.status != 200:
|
| + logger.warning('Request to %s failed: %s' % (path, response.status))
|
| + return None
|
| + return json.loads(response.read())
|
|
|