OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 | |
3 # Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/ | |
4 # Copyright (c) 2010, Eucalyptus Systems, Inc. | |
5 # All rights reserved. | |
6 # | |
7 # Permission is hereby granted, free of charge, to any person obtaining a | |
8 # copy of this software and associated documentation files (the | |
9 # "Software"), to deal in the Software without restriction, including | |
10 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
11 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
12 # persons to whom the Software is furnished to do so, subject to the fol- | |
13 # lowing conditions: | |
14 # | |
15 # The above copyright notice and this permission notice shall be included | |
16 # in all copies or substantial portions of the Software. | |
17 # | |
18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
19 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
20 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
21 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
22 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
24 # IN THE SOFTWARE. | |
25 | |
26 """ | |
27 Some unit tests for the SQSConnection | |
28 """ | |
29 | |
30 import unittest | |
31 import time | |
32 from boto.sqs.connection import SQSConnection | |
33 from boto.sqs.message import MHMessage | |
34 from boto.exception import SQSError | |
35 | |
36 class SQSConnectionTest (unittest.TestCase): | |
37 | |
38 def test_1_basic(self): | |
39 print '--- running SQSConnection tests ---' | |
40 c = SQSConnection() | |
41 rs = c.get_all_queues() | |
42 num_queues = 0 | |
43 for q in rs: | |
44 num_queues += 1 | |
45 | |
46 # try illegal name | |
47 try: | |
48 queue = c.create_queue('bad_queue_name') | |
49 except SQSError: | |
50 pass | |
51 | |
52 # now create one that should work and should be unique (i.e. a new one) | |
53 queue_name = 'test%d' % int(time.time()) | |
54 timeout = 60 | |
55 queue = c.create_queue(queue_name, timeout) | |
56 time.sleep(60) | |
57 rs = c.get_all_queues() | |
58 i = 0 | |
59 for q in rs: | |
60 i += 1 | |
61 assert i == num_queues+1 | |
62 assert queue.count_slow() == 0 | |
63 | |
64 # check the visibility timeout | |
65 t = queue.get_timeout() | |
66 assert t == timeout, '%d != %d' % (t, timeout) | |
67 | |
68 # now try to get queue attributes | |
69 a = q.get_attributes() | |
70 assert a.has_key('ApproximateNumberOfMessages') | |
71 assert a.has_key('VisibilityTimeout') | |
72 a = q.get_attributes('ApproximateNumberOfMessages') | |
73 assert a.has_key('ApproximateNumberOfMessages') | |
74 assert not a.has_key('VisibilityTimeout') | |
75 a = q.get_attributes('VisibilityTimeout') | |
76 assert not a.has_key('ApproximateNumberOfMessages') | |
77 assert a.has_key('VisibilityTimeout') | |
78 | |
79 # now change the visibility timeout | |
80 timeout = 45 | |
81 queue.set_timeout(timeout) | |
82 time.sleep(60) | |
83 t = queue.get_timeout() | |
84 assert t == timeout, '%d != %d' % (t, timeout) | |
85 | |
86 # now add a message | |
87 message_body = 'This is a test\n' | |
88 message = queue.new_message(message_body) | |
89 queue.write(message) | |
90 time.sleep(60) | |
91 assert queue.count_slow() == 1 | |
92 time.sleep(90) | |
93 | |
94 # now read the message from the queue with a 10 second timeout | |
95 message = queue.read(visibility_timeout=10) | |
96 assert message | |
97 assert message.get_body() == message_body | |
98 | |
99 # now immediately try another read, shouldn't find anything | |
100 message = queue.read() | |
101 assert message == None | |
102 | |
103 # now wait 30 seconds and try again | |
104 time.sleep(30) | |
105 message = queue.read() | |
106 assert message | |
107 | |
108 # now delete the message | |
109 queue.delete_message(message) | |
110 time.sleep(30) | |
111 assert queue.count_slow() == 0 | |
112 | |
113 # create another queue so we can test force deletion | |
114 # we will also test MHMessage with this queue | |
115 queue_name = 'test%d' % int(time.time()) | |
116 timeout = 60 | |
117 queue = c.create_queue(queue_name, timeout) | |
118 queue.set_message_class(MHMessage) | |
119 time.sleep(30) | |
120 | |
121 # now add a couple of messages | |
122 message = queue.new_message() | |
123 message['foo'] = 'bar' | |
124 queue.write(message) | |
125 message_body = {'fie' : 'baz', 'foo' : 'bar'} | |
126 message = queue.new_message(body=message_body) | |
127 queue.write(message) | |
128 time.sleep(30) | |
129 | |
130 m = queue.read() | |
131 assert m['foo'] == 'bar' | |
132 | |
133 # now delete that queue and messages | |
134 c.delete_queue(queue, True) | |
135 | |
136 print '--- tests completed ---' | |
137 | |
OLD | NEW |