OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/python |
| 2 |
| 3 import urllib2 |
| 4 import sys |
| 5 import threading |
| 6 from urlparse import urljoin |
| 7 from xml.dom import minidom |
| 8 |
| 9 UPDATE_BLOB="""\ |
| 10 <?xml version="1.0" encoding="UTF-8"?> |
| 11 <o:gupdate |
| 12 xmlns:o="http://www.google.com/update2/request" |
| 13 version="MementoSoftwareUpdate-0.1.0.0" |
| 14 protocol="2.0" |
| 15 machineid="{1B0A13AC-7004-638C-3CA6-EC082E8F5DE9}" |
| 16 ismachine="0" |
| 17 userid="{bogus}"> |
| 18 <o:os version="Memento" |
| 19 platform="memento" |
| 20 sp="ForcedUpdate_i686"> |
| 21 </o:os> |
| 22 <o:app appid="{87efface-864d-49a5-9bb3-4b050a7c227a}" |
| 23 version="ForcedUpdate" |
| 24 lang="en-us" |
| 25 brand="GGLG" |
| 26 track="developer-build" |
| 27 board="x86-generic"> |
| 28 <o:ping active="0"></o:ping> |
| 29 <o:updatecheck></o:updatecheck> |
| 30 </o:app> |
| 31 </o:gupdate> |
| 32 """ |
| 33 |
| 34 UPDATE_URL = 'http://localhost:8080/update/' |
| 35 |
| 36 def do_ping(): |
| 37 update_ping = urllib2.Request(UPDATE_URL, update_blob) |
| 38 update_ping.add_header('Content-Type', 'text/xml') |
| 39 print urllib2.urlopen(update_ping).read() |
| 40 #TODO assert noupdate |
| 41 |
| 42 def do_version_ping(): |
| 43 url = UPDATE_URL + 'LATEST' |
| 44 update_ping = urllib2.Request(url, UPDATE_BLOB) |
| 45 update_ping.add_header('Content-Type', 'text/xml') |
| 46 response = urllib2.urlopen(update_ping).read() |
| 47 assert _verify_response(response), 'couldn\'t fetch update file' |
| 48 print 'Successfully pinged updateserver.' |
| 49 |
| 50 def do_badversion_ping(): |
| 51 url = UPDATE_URL + 'BADVERSION' |
| 52 update_ping = urllib2.Request(url, UPDATE_BLOB) |
| 53 update_ping.add_header('Content-Type', 'text/xml') |
| 54 response = urllib2.urlopen(update_ping).read() |
| 55 assert ('noupdate' in response) |
| 56 print 'requesting bogus version returns noupdate.' |
| 57 |
| 58 def _verify_download(url, content_length): |
| 59 # Eventually, verify something about the update. For now, |
| 60 # sanity-check its size. |
| 61 f = urllib2.urlopen(url) |
| 62 data = f.read(1024 * 1024) |
| 63 length = len(data) |
| 64 while data: |
| 65 data = f.read(1024 * 1024) |
| 66 length += len(data) |
| 67 assert content_length == length |
| 68 print 'Got a valid download.' |
| 69 return True |
| 70 |
| 71 def _verify_response(data): |
| 72 update_dom = minidom.parseString(data) |
| 73 print data |
| 74 root = update_dom.firstChild |
| 75 update_info = root.getElementsByTagName('updatecheck')[0] |
| 76 update_url = update_info.getAttribute('codebase') |
| 77 hash = update_info.getAttribute('hash') |
| 78 head_request = urllib2.Request(update_url) |
| 79 head_request.get_method = lambda: 'HEAD' |
| 80 try: |
| 81 fd = urllib2.urlopen(head_request) |
| 82 except urllib2.HTTPError, e: |
| 83 print 'FAILED: unable to retrieve %s\n\t%s' % (update_url, e) |
| 84 return False |
| 85 length = int(fd.headers.getheaders('Content-Length')[0]) |
| 86 assert length > 0 |
| 87 print 'Got a valid update response.' |
| 88 fd.close() |
| 89 assert (urllib2.urlopen(urljoin(update_url, 'cksum')).read() == hash) |
| 90 print 'Update cksum matched the one in the update XML.' |
| 91 return _verify_download(update_url, length) |
| 92 |
| 93 def test(num_clients): |
| 94 # Fake some concurrent requests for each autoupdate operation. |
| 95 for clients in range(num_clients): |
| 96 for op in (do_version_ping, do_badversion_ping): |
| 97 t = threading.Thread(target=op) |
| 98 t.start() |
| 99 |
| 100 if __name__ == '__main__': |
| 101 if len(sys.argv) > 1: |
| 102 num_clients = int(sys.argv[1]) |
| 103 else: |
| 104 num_clients = 1 |
| 105 test(num_clients) |
OLD | NEW |