Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(83)

Unified Diff: tests/s3/test_pool.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tests/s3/test_https_cert_validation.py ('k') | tests/s3/test_resumable_downloads.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tests/s3/test_pool.py
diff --git a/tests/s3/test_pool.py b/tests/s3/test_pool.py
new file mode 100644
index 0000000000000000000000000000000000000000..ebb68c8556a5c615bf3ce2448294261cd4e7e96f
--- /dev/null
+++ b/tests/s3/test_pool.py
@@ -0,0 +1,246 @@
+# Copyright (c) 2011 Brian Beach
+# All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Some multi-threading tests of boto in a greenlet environment.
+"""
+
+import boto
+import time
+import uuid
+
+from StringIO import StringIO
+
+from threading import Thread
+
+def spawn(function, *args, **kwargs):
+ """
+ Spawns a new thread. API is the same as
+ gevent.greenlet.Greenlet.spawn.
+ """
+ t = Thread(target = function, args = args, kwargs = kwargs)
+ t.start()
+ return t
+
+def put_object(bucket, name):
+ bucket.new_key(name).set_contents_from_string(name)
+
+def get_object(bucket, name):
+ assert bucket.get_key(name).get_contents_as_string() == name
+
+def test_close_connections():
+ """
+ A test that exposes the problem where connections are returned to the
+ connection pool (and closed) before the caller reads the response.
+
+ I couldn't think of a way to test it without greenlets, so this test
+ doesn't run as part of the standard test suite. That way, no more
+ dependencies are added to the test suite.
+ """
+
+ print "Running test_close_connections"
+
+ # Connect to S3
+ s3 = boto.connect_s3()
+
+ # Clean previous tests.
+ for b in s3.get_all_buckets():
+ if b.name.startswith('test-'):
+ for key in b.get_all_keys():
+ key.delete()
+ b.delete()
+
+ # Make a test bucket
+ bucket = s3.create_bucket('test-%d' % int(time.time()))
+
+ # Create 30 threads that each create an object in S3. The number
+ # 30 is chosen because it is larger than the connection pool size
+ # (20).
+ names = [str(uuid.uuid4) for _ in range(30)]
+ threads = [
+ spawn(put_object, bucket, name)
+ for name in names
+ ]
+ for t in threads:
+ t.join()
+
+ # Create 30 threads to read the contents of the new objects. This
+ # is where closing the connection early is a problem, because
+ # there is a response that needs to be read, and it can't be read
+ # if the connection has already been closed.
+ threads = [
+ spawn(get_object, bucket, name)
+ for name in names
+ ]
+ for t in threads:
+ t.join()
+
+# test_reuse_connections needs to read a file that is big enough that
+# one read() call on the socket won't read the whole thing.
+BIG_SIZE = 10000
+
+class WriteAndCount(object):
+
+ """
+ A file-like object that counts the number of characters written.
+ """
+
+ def __init__(self):
+ self.size = 0
+
+ def write(self, data):
+ self.size += len(data)
+ time.sleep(0) # yield to other threads
+
+def read_big_object(s3, bucket, name, count):
+ for _ in range(count):
+ key = bucket.get_key(name)
+ out = WriteAndCount()
+ key.get_contents_to_file(out)
+ if out.size != BIG_SIZE:
+ print out.size, BIG_SIZE
+ assert out.size == BIG_SIZE
+ print " pool size:", s3._pool.size()
+
+class LittleQuerier(object):
+
+ """
+ An object that manages a thread that keeps pulling down small
+ objects from S3 and checking the answers until told to stop.
+ """
+
+ def __init__(self, bucket, small_names):
+ self.running = True
+ self.bucket = bucket
+ self.small_names = small_names
+ self.thread = spawn(self.run)
+
+ def stop(self):
+ self.running = False
+ self.thread.join()
+
+ def run(self):
+ count = 0
+ while self.running:
+ i = count % 4
+ key = self.bucket.get_key(self.small_names[i])
+ expected = str(i)
+ rh = { 'response-content-type' : 'small/' + str(i) }
+ actual = key.get_contents_as_string(response_headers = rh)
+ if expected != actual:
+ print "AHA:", repr(expected), repr(actual)
+ assert expected == actual
+ count += 1
+
+def test_reuse_connections():
+ """
+ This test is an attempt to expose problems because of the fact
+ that boto returns connections to the connection pool before
+ reading the response. The strategy is to start a couple big reads
+ from S3, where it will take time to read the response, and then
+ start other requests that will reuse the same connection from the
+ pool while the big response is still being read.
+
+ The test passes because of an interesting combination of factors.
+ I was expecting that it would fail because two threads would be
+ reading the same connection at the same time. That doesn't happen
+ because httplib catches the problem before it happens and raises
+ an exception.
+
+ Here's the sequence of events:
+
+ - Thread 1: Send a request to read a big S3 object.
+ - Thread 1: Returns connection to pool.
+ - Thread 1: Start reading the body if the response.
+
+ - Thread 2: Get the same connection from the pool.
+ - Thread 2: Send another request on the same connection.
+ - Thread 2: Try to read the response, but
+ HTTPConnection.get_response notices that the
+ previous response isn't done reading yet, and
+ raises a ResponseNotReady exception.
+ - Thread 2: _mexe catches the exception, does not return the
+ connection to the pool, gets a new connection, and
+ retries.
+
+ - Thread 1: Finish reading the body of its response.
+
+ - Server: Gets the second request on the connection, and
+ sends a response. This response is ignored because
+ the connection has been dropped on the client end.
+
+ If you add a print statement in HTTPConnection.get_response at the
+ point where it raises ResponseNotReady, and then run this test,
+ you can see that it's happening.
+ """
+
+ print "Running test_reuse_connections"
+
+ # Connect to S3
+ s3 = boto.connect_s3()
+
+ # Make a test bucket
+ bucket = s3.create_bucket('test-%d' % int(time.time()))
+
+ # Create some small objects in S3.
+ small_names = [str(uuid.uuid4()) for _ in range(4)]
+ for (i, name) in enumerate(small_names):
+ bucket.new_key(name).set_contents_from_string(str(i))
+
+ # Wait, clean the connection pool, and make sure it's empty.
+ print " waiting for all connections to become stale"
+ time.sleep(s3._pool.STALE_DURATION + 1)
+ s3._pool.clean()
+ assert s3._pool.size() == 0
+ print " pool is empty"
+
+ # Create a big object in S3.
+ big_name = str(uuid.uuid4())
+ contents = "-" * BIG_SIZE
+ bucket.new_key(big_name).set_contents_from_string(contents)
+
+ # Start some threads to read it and check that they are reading
+ # the correct thing. Each thread will read the object 40 times.
+ threads = [
+ spawn(read_big_object, s3, bucket, big_name, 20)
+ for _ in range(5)
+ ]
+
+ # Do some other things that may (incorrectly) re-use the same
+ # connections while the big objects are being read.
+ queriers = [
+ LittleQuerier(bucket, small_names)
+ for _ in range(5)
+ ]
+
+ # Clean up.
+ for t in threads:
+ t.join()
+ for q in queriers:
+ q.stop()
+
+def main():
+ test_close_connections()
+ test_reuse_connections()
+
+if __name__ == '__main__':
+ main()
« no previous file with comments | « tests/s3/test_https_cert_validation.py ('k') | tests/s3/test_resumable_downloads.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698