Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: tests/s3/test_pool.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tests/s3/test_https_cert_validation.py ('k') | tests/s3/test_resumable_downloads.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright (c) 2011 Brian Beach
2 # All rights reserved.
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22
23 """
24 Some multi-threading tests of boto in a greenlet environment.
25 """
26
27 import boto
28 import time
29 import uuid
30
31 from StringIO import StringIO
32
33 from threading import Thread
34
35 def spawn(function, *args, **kwargs):
36 """
37 Spawns a new thread. API is the same as
38 gevent.greenlet.Greenlet.spawn.
39 """
40 t = Thread(target = function, args = args, kwargs = kwargs)
41 t.start()
42 return t
43
44 def put_object(bucket, name):
45 bucket.new_key(name).set_contents_from_string(name)
46
47 def get_object(bucket, name):
48 assert bucket.get_key(name).get_contents_as_string() == name
49
50 def test_close_connections():
51 """
52 A test that exposes the problem where connections are returned to the
53 connection pool (and closed) before the caller reads the response.
54
55 I couldn't think of a way to test it without greenlets, so this test
56 doesn't run as part of the standard test suite. That way, no more
57 dependencies are added to the test suite.
58 """
59
60 print "Running test_close_connections"
61
62 # Connect to S3
63 s3 = boto.connect_s3()
64
65 # Clean previous tests.
66 for b in s3.get_all_buckets():
67 if b.name.startswith('test-'):
68 for key in b.get_all_keys():
69 key.delete()
70 b.delete()
71
72 # Make a test bucket
73 bucket = s3.create_bucket('test-%d' % int(time.time()))
74
75 # Create 30 threads that each create an object in S3. The number
76 # 30 is chosen because it is larger than the connection pool size
77 # (20).
78 names = [str(uuid.uuid4) for _ in range(30)]
79 threads = [
80 spawn(put_object, bucket, name)
81 for name in names
82 ]
83 for t in threads:
84 t.join()
85
86 # Create 30 threads to read the contents of the new objects. This
87 # is where closing the connection early is a problem, because
88 # there is a response that needs to be read, and it can't be read
89 # if the connection has already been closed.
90 threads = [
91 spawn(get_object, bucket, name)
92 for name in names
93 ]
94 for t in threads:
95 t.join()
96
97 # test_reuse_connections needs to read a file that is big enough that
98 # one read() call on the socket won't read the whole thing.
99 BIG_SIZE = 10000
100
101 class WriteAndCount(object):
102
103 """
104 A file-like object that counts the number of characters written.
105 """
106
107 def __init__(self):
108 self.size = 0
109
110 def write(self, data):
111 self.size += len(data)
112 time.sleep(0) # yield to other threads
113
114 def read_big_object(s3, bucket, name, count):
115 for _ in range(count):
116 key = bucket.get_key(name)
117 out = WriteAndCount()
118 key.get_contents_to_file(out)
119 if out.size != BIG_SIZE:
120 print out.size, BIG_SIZE
121 assert out.size == BIG_SIZE
122 print " pool size:", s3._pool.size()
123
124 class LittleQuerier(object):
125
126 """
127 An object that manages a thread that keeps pulling down small
128 objects from S3 and checking the answers until told to stop.
129 """
130
131 def __init__(self, bucket, small_names):
132 self.running = True
133 self.bucket = bucket
134 self.small_names = small_names
135 self.thread = spawn(self.run)
136
137 def stop(self):
138 self.running = False
139 self.thread.join()
140
141 def run(self):
142 count = 0
143 while self.running:
144 i = count % 4
145 key = self.bucket.get_key(self.small_names[i])
146 expected = str(i)
147 rh = { 'response-content-type' : 'small/' + str(i) }
148 actual = key.get_contents_as_string(response_headers = rh)
149 if expected != actual:
150 print "AHA:", repr(expected), repr(actual)
151 assert expected == actual
152 count += 1
153
154 def test_reuse_connections():
155 """
156 This test is an attempt to expose problems because of the fact
157 that boto returns connections to the connection pool before
158 reading the response. The strategy is to start a couple big reads
159 from S3, where it will take time to read the response, and then
160 start other requests that will reuse the same connection from the
161 pool while the big response is still being read.
162
163 The test passes because of an interesting combination of factors.
164 I was expecting that it would fail because two threads would be
165 reading the same connection at the same time. That doesn't happen
166 because httplib catches the problem before it happens and raises
167 an exception.
168
169 Here's the sequence of events:
170
171 - Thread 1: Send a request to read a big S3 object.
172 - Thread 1: Returns connection to pool.
173 - Thread 1: Start reading the body if the response.
174
175 - Thread 2: Get the same connection from the pool.
176 - Thread 2: Send another request on the same connection.
177 - Thread 2: Try to read the response, but
178 HTTPConnection.get_response notices that the
179 previous response isn't done reading yet, and
180 raises a ResponseNotReady exception.
181 - Thread 2: _mexe catches the exception, does not return the
182 connection to the pool, gets a new connection, and
183 retries.
184
185 - Thread 1: Finish reading the body of its response.
186
187 - Server: Gets the second request on the connection, and
188 sends a response. This response is ignored because
189 the connection has been dropped on the client end.
190
191 If you add a print statement in HTTPConnection.get_response at the
192 point where it raises ResponseNotReady, and then run this test,
193 you can see that it's happening.
194 """
195
196 print "Running test_reuse_connections"
197
198 # Connect to S3
199 s3 = boto.connect_s3()
200
201 # Make a test bucket
202 bucket = s3.create_bucket('test-%d' % int(time.time()))
203
204 # Create some small objects in S3.
205 small_names = [str(uuid.uuid4()) for _ in range(4)]
206 for (i, name) in enumerate(small_names):
207 bucket.new_key(name).set_contents_from_string(str(i))
208
209 # Wait, clean the connection pool, and make sure it's empty.
210 print " waiting for all connections to become stale"
211 time.sleep(s3._pool.STALE_DURATION + 1)
212 s3._pool.clean()
213 assert s3._pool.size() == 0
214 print " pool is empty"
215
216 # Create a big object in S3.
217 big_name = str(uuid.uuid4())
218 contents = "-" * BIG_SIZE
219 bucket.new_key(big_name).set_contents_from_string(contents)
220
221 # Start some threads to read it and check that they are reading
222 # the correct thing. Each thread will read the object 40 times.
223 threads = [
224 spawn(read_big_object, s3, bucket, big_name, 20)
225 for _ in range(5)
226 ]
227
228 # Do some other things that may (incorrectly) re-use the same
229 # connections while the big objects are being read.
230 queriers = [
231 LittleQuerier(bucket, small_names)
232 for _ in range(5)
233 ]
234
235 # Clean up.
236 for t in threads:
237 t.join()
238 for q in queriers:
239 q.stop()
240
241 def main():
242 test_close_connections()
243 test_reuse_connections()
244
245 if __name__ == '__main__':
246 main()
OLDNEW
« no previous file with comments | « tests/s3/test_https_cert_validation.py ('k') | tests/s3/test_resumable_downloads.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698