| Index: tests/s3/test_pool.py
|
| diff --git a/tests/s3/test_pool.py b/tests/s3/test_pool.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ebb68c8556a5c615bf3ce2448294261cd4e7e96f
|
| --- /dev/null
|
| +++ b/tests/s3/test_pool.py
|
| @@ -0,0 +1,246 @@
|
| +# Copyright (c) 2011 Brian Beach
|
| +# All rights reserved.
|
| +#
|
| +# Permission is hereby granted, free of charge, to any person obtaining a
|
| +# copy of this software and associated documentation files (the
|
| +# "Software"), to deal in the Software without restriction, including
|
| +# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| +# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| +# persons to whom the Software is furnished to do so, subject to the fol-
|
| +# lowing conditions:
|
| +#
|
| +# The above copyright notice and this permission notice shall be included
|
| +# in all copies or substantial portions of the Software.
|
| +#
|
| +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| +# IN THE SOFTWARE.
|
| +
|
| +"""
|
| +Some multi-threading tests of boto in a greenlet environment.
|
| +"""
|
| +
|
| +import boto
|
| +import time
|
| +import uuid
|
| +
|
| +from StringIO import StringIO
|
| +
|
| +from threading import Thread
|
| +
|
| +def spawn(function, *args, **kwargs):
|
| + """
|
| + Spawns a new thread. API is the same as
|
| + gevent.greenlet.Greenlet.spawn.
|
| + """
|
| + t = Thread(target = function, args = args, kwargs = kwargs)
|
| + t.start()
|
| + return t
|
| +
|
| +def put_object(bucket, name):
|
| + bucket.new_key(name).set_contents_from_string(name)
|
| +
|
| +def get_object(bucket, name):
|
| + assert bucket.get_key(name).get_contents_as_string() == name
|
| +
|
| +def test_close_connections():
|
| + """
|
| + A test that exposes the problem where connections are returned to the
|
| + connection pool (and closed) before the caller reads the response.
|
| +
|
| + I couldn't think of a way to test it without greenlets, so this test
|
| + doesn't run as part of the standard test suite. That way, no more
|
| + dependencies are added to the test suite.
|
| + """
|
| +
|
| + print "Running test_close_connections"
|
| +
|
| + # Connect to S3
|
| + s3 = boto.connect_s3()
|
| +
|
| + # Clean previous tests.
|
| + for b in s3.get_all_buckets():
|
| + if b.name.startswith('test-'):
|
| + for key in b.get_all_keys():
|
| + key.delete()
|
| + b.delete()
|
| +
|
| + # Make a test bucket
|
| + bucket = s3.create_bucket('test-%d' % int(time.time()))
|
| +
|
| + # Create 30 threads that each create an object in S3. The number
|
| + # 30 is chosen because it is larger than the connection pool size
|
| + # (20).
|
| + names = [str(uuid.uuid4) for _ in range(30)]
|
| + threads = [
|
| + spawn(put_object, bucket, name)
|
| + for name in names
|
| + ]
|
| + for t in threads:
|
| + t.join()
|
| +
|
| + # Create 30 threads to read the contents of the new objects. This
|
| + # is where closing the connection early is a problem, because
|
| + # there is a response that needs to be read, and it can't be read
|
| + # if the connection has already been closed.
|
| + threads = [
|
| + spawn(get_object, bucket, name)
|
| + for name in names
|
| + ]
|
| + for t in threads:
|
| + t.join()
|
| +
|
| +# test_reuse_connections needs to read a file that is big enough that
|
| +# one read() call on the socket won't read the whole thing.
|
| +BIG_SIZE = 10000
|
| +
|
| +class WriteAndCount(object):
|
| +
|
| + """
|
| + A file-like object that counts the number of characters written.
|
| + """
|
| +
|
| + def __init__(self):
|
| + self.size = 0
|
| +
|
| + def write(self, data):
|
| + self.size += len(data)
|
| + time.sleep(0) # yield to other threads
|
| +
|
| +def read_big_object(s3, bucket, name, count):
|
| + for _ in range(count):
|
| + key = bucket.get_key(name)
|
| + out = WriteAndCount()
|
| + key.get_contents_to_file(out)
|
| + if out.size != BIG_SIZE:
|
| + print out.size, BIG_SIZE
|
| + assert out.size == BIG_SIZE
|
| + print " pool size:", s3._pool.size()
|
| +
|
| +class LittleQuerier(object):
|
| +
|
| + """
|
| + An object that manages a thread that keeps pulling down small
|
| + objects from S3 and checking the answers until told to stop.
|
| + """
|
| +
|
| + def __init__(self, bucket, small_names):
|
| + self.running = True
|
| + self.bucket = bucket
|
| + self.small_names = small_names
|
| + self.thread = spawn(self.run)
|
| +
|
| + def stop(self):
|
| + self.running = False
|
| + self.thread.join()
|
| +
|
| + def run(self):
|
| + count = 0
|
| + while self.running:
|
| + i = count % 4
|
| + key = self.bucket.get_key(self.small_names[i])
|
| + expected = str(i)
|
| + rh = { 'response-content-type' : 'small/' + str(i) }
|
| + actual = key.get_contents_as_string(response_headers = rh)
|
| + if expected != actual:
|
| + print "AHA:", repr(expected), repr(actual)
|
| + assert expected == actual
|
| + count += 1
|
| +
|
| +def test_reuse_connections():
|
| + """
|
| + This test is an attempt to expose problems because of the fact
|
| + that boto returns connections to the connection pool before
|
| + reading the response. The strategy is to start a couple big reads
|
| + from S3, where it will take time to read the response, and then
|
| + start other requests that will reuse the same connection from the
|
| + pool while the big response is still being read.
|
| +
|
| + The test passes because of an interesting combination of factors.
|
| + I was expecting that it would fail because two threads would be
|
| + reading the same connection at the same time. That doesn't happen
|
| + because httplib catches the problem before it happens and raises
|
| + an exception.
|
| +
|
| + Here's the sequence of events:
|
| +
|
| + - Thread 1: Send a request to read a big S3 object.
|
| + - Thread 1: Returns connection to pool.
|
| + - Thread 1: Start reading the body if the response.
|
| +
|
| + - Thread 2: Get the same connection from the pool.
|
| + - Thread 2: Send another request on the same connection.
|
| + - Thread 2: Try to read the response, but
|
| + HTTPConnection.get_response notices that the
|
| + previous response isn't done reading yet, and
|
| + raises a ResponseNotReady exception.
|
| + - Thread 2: _mexe catches the exception, does not return the
|
| + connection to the pool, gets a new connection, and
|
| + retries.
|
| +
|
| + - Thread 1: Finish reading the body of its response.
|
| +
|
| + - Server: Gets the second request on the connection, and
|
| + sends a response. This response is ignored because
|
| + the connection has been dropped on the client end.
|
| +
|
| + If you add a print statement in HTTPConnection.get_response at the
|
| + point where it raises ResponseNotReady, and then run this test,
|
| + you can see that it's happening.
|
| + """
|
| +
|
| + print "Running test_reuse_connections"
|
| +
|
| + # Connect to S3
|
| + s3 = boto.connect_s3()
|
| +
|
| + # Make a test bucket
|
| + bucket = s3.create_bucket('test-%d' % int(time.time()))
|
| +
|
| + # Create some small objects in S3.
|
| + small_names = [str(uuid.uuid4()) for _ in range(4)]
|
| + for (i, name) in enumerate(small_names):
|
| + bucket.new_key(name).set_contents_from_string(str(i))
|
| +
|
| + # Wait, clean the connection pool, and make sure it's empty.
|
| + print " waiting for all connections to become stale"
|
| + time.sleep(s3._pool.STALE_DURATION + 1)
|
| + s3._pool.clean()
|
| + assert s3._pool.size() == 0
|
| + print " pool is empty"
|
| +
|
| + # Create a big object in S3.
|
| + big_name = str(uuid.uuid4())
|
| + contents = "-" * BIG_SIZE
|
| + bucket.new_key(big_name).set_contents_from_string(contents)
|
| +
|
| + # Start some threads to read it and check that they are reading
|
| + # the correct thing. Each thread will read the object 40 times.
|
| + threads = [
|
| + spawn(read_big_object, s3, bucket, big_name, 20)
|
| + for _ in range(5)
|
| + ]
|
| +
|
| + # Do some other things that may (incorrectly) re-use the same
|
| + # connections while the big objects are being read.
|
| + queriers = [
|
| + LittleQuerier(bucket, small_names)
|
| + for _ in range(5)
|
| + ]
|
| +
|
| + # Clean up.
|
| + for t in threads:
|
| + t.join()
|
| + for q in queriers:
|
| + q.stop()
|
| +
|
| +def main():
|
| + test_close_connections()
|
| + test_reuse_connections()
|
| +
|
| +if __name__ == '__main__':
|
| + main()
|
|
|