OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2011 Brian Beach |
| 2 # All rights reserved. |
| 3 # |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 5 # copy of this software and associated documentation files (the |
| 6 # "Software"), to deal in the Software without restriction, including |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 10 # lowing conditions: |
| 11 # |
| 12 # The above copyright notice and this permission notice shall be included |
| 13 # in all copies or substantial portions of the Software. |
| 14 # |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 21 # IN THE SOFTWARE. |
| 22 |
| 23 """ |
| 24 Some multi-threading tests of boto in a greenlet environment. |
| 25 """ |
| 26 |
| 27 import boto |
| 28 import time |
| 29 import uuid |
| 30 |
| 31 from StringIO import StringIO |
| 32 |
| 33 from threading import Thread |
| 34 |
| 35 def spawn(function, *args, **kwargs): |
| 36 """ |
| 37 Spawns a new thread. API is the same as |
| 38 gevent.greenlet.Greenlet.spawn. |
| 39 """ |
| 40 t = Thread(target = function, args = args, kwargs = kwargs) |
| 41 t.start() |
| 42 return t |
| 43 |
| 44 def put_object(bucket, name): |
| 45 bucket.new_key(name).set_contents_from_string(name) |
| 46 |
| 47 def get_object(bucket, name): |
| 48 assert bucket.get_key(name).get_contents_as_string() == name |
| 49 |
| 50 def test_close_connections(): |
| 51 """ |
| 52 A test that exposes the problem where connections are returned to the |
| 53 connection pool (and closed) before the caller reads the response. |
| 54 |
| 55 I couldn't think of a way to test it without greenlets, so this test |
| 56 doesn't run as part of the standard test suite. That way, no more |
| 57 dependencies are added to the test suite. |
| 58 """ |
| 59 |
| 60 print "Running test_close_connections" |
| 61 |
| 62 # Connect to S3 |
| 63 s3 = boto.connect_s3() |
| 64 |
| 65 # Clean previous tests. |
| 66 for b in s3.get_all_buckets(): |
| 67 if b.name.startswith('test-'): |
| 68 for key in b.get_all_keys(): |
| 69 key.delete() |
| 70 b.delete() |
| 71 |
| 72 # Make a test bucket |
| 73 bucket = s3.create_bucket('test-%d' % int(time.time())) |
| 74 |
| 75 # Create 30 threads that each create an object in S3. The number |
| 76 # 30 is chosen because it is larger than the connection pool size |
| 77 # (20). |
| 78 names = [str(uuid.uuid4) for _ in range(30)] |
| 79 threads = [ |
| 80 spawn(put_object, bucket, name) |
| 81 for name in names |
| 82 ] |
| 83 for t in threads: |
| 84 t.join() |
| 85 |
| 86 # Create 30 threads to read the contents of the new objects. This |
| 87 # is where closing the connection early is a problem, because |
| 88 # there is a response that needs to be read, and it can't be read |
| 89 # if the connection has already been closed. |
| 90 threads = [ |
| 91 spawn(get_object, bucket, name) |
| 92 for name in names |
| 93 ] |
| 94 for t in threads: |
| 95 t.join() |
| 96 |
| 97 # test_reuse_connections needs to read a file that is big enough that |
| 98 # one read() call on the socket won't read the whole thing. |
| 99 BIG_SIZE = 10000 |
| 100 |
| 101 class WriteAndCount(object): |
| 102 |
| 103 """ |
| 104 A file-like object that counts the number of characters written. |
| 105 """ |
| 106 |
| 107 def __init__(self): |
| 108 self.size = 0 |
| 109 |
| 110 def write(self, data): |
| 111 self.size += len(data) |
| 112 time.sleep(0) # yield to other threads |
| 113 |
| 114 def read_big_object(s3, bucket, name, count): |
| 115 for _ in range(count): |
| 116 key = bucket.get_key(name) |
| 117 out = WriteAndCount() |
| 118 key.get_contents_to_file(out) |
| 119 if out.size != BIG_SIZE: |
| 120 print out.size, BIG_SIZE |
| 121 assert out.size == BIG_SIZE |
| 122 print " pool size:", s3._pool.size() |
| 123 |
| 124 class LittleQuerier(object): |
| 125 |
| 126 """ |
| 127 An object that manages a thread that keeps pulling down small |
| 128 objects from S3 and checking the answers until told to stop. |
| 129 """ |
| 130 |
| 131 def __init__(self, bucket, small_names): |
| 132 self.running = True |
| 133 self.bucket = bucket |
| 134 self.small_names = small_names |
| 135 self.thread = spawn(self.run) |
| 136 |
| 137 def stop(self): |
| 138 self.running = False |
| 139 self.thread.join() |
| 140 |
| 141 def run(self): |
| 142 count = 0 |
| 143 while self.running: |
| 144 i = count % 4 |
| 145 key = self.bucket.get_key(self.small_names[i]) |
| 146 expected = str(i) |
| 147 rh = { 'response-content-type' : 'small/' + str(i) } |
| 148 actual = key.get_contents_as_string(response_headers = rh) |
| 149 if expected != actual: |
| 150 print "AHA:", repr(expected), repr(actual) |
| 151 assert expected == actual |
| 152 count += 1 |
| 153 |
| 154 def test_reuse_connections(): |
| 155 """ |
| 156 This test is an attempt to expose problems because of the fact |
| 157 that boto returns connections to the connection pool before |
| 158 reading the response. The strategy is to start a couple big reads |
| 159 from S3, where it will take time to read the response, and then |
| 160 start other requests that will reuse the same connection from the |
| 161 pool while the big response is still being read. |
| 162 |
| 163 The test passes because of an interesting combination of factors. |
| 164 I was expecting that it would fail because two threads would be |
| 165 reading the same connection at the same time. That doesn't happen |
| 166 because httplib catches the problem before it happens and raises |
| 167 an exception. |
| 168 |
| 169 Here's the sequence of events: |
| 170 |
| 171 - Thread 1: Send a request to read a big S3 object. |
| 172 - Thread 1: Returns connection to pool. |
| 173 - Thread 1: Start reading the body if the response. |
| 174 |
| 175 - Thread 2: Get the same connection from the pool. |
| 176 - Thread 2: Send another request on the same connection. |
| 177 - Thread 2: Try to read the response, but |
| 178 HTTPConnection.get_response notices that the |
| 179 previous response isn't done reading yet, and |
| 180 raises a ResponseNotReady exception. |
| 181 - Thread 2: _mexe catches the exception, does not return the |
| 182 connection to the pool, gets a new connection, and |
| 183 retries. |
| 184 |
| 185 - Thread 1: Finish reading the body of its response. |
| 186 |
| 187 - Server: Gets the second request on the connection, and |
| 188 sends a response. This response is ignored because |
| 189 the connection has been dropped on the client end. |
| 190 |
| 191 If you add a print statement in HTTPConnection.get_response at the |
| 192 point where it raises ResponseNotReady, and then run this test, |
| 193 you can see that it's happening. |
| 194 """ |
| 195 |
| 196 print "Running test_reuse_connections" |
| 197 |
| 198 # Connect to S3 |
| 199 s3 = boto.connect_s3() |
| 200 |
| 201 # Make a test bucket |
| 202 bucket = s3.create_bucket('test-%d' % int(time.time())) |
| 203 |
| 204 # Create some small objects in S3. |
| 205 small_names = [str(uuid.uuid4()) for _ in range(4)] |
| 206 for (i, name) in enumerate(small_names): |
| 207 bucket.new_key(name).set_contents_from_string(str(i)) |
| 208 |
| 209 # Wait, clean the connection pool, and make sure it's empty. |
| 210 print " waiting for all connections to become stale" |
| 211 time.sleep(s3._pool.STALE_DURATION + 1) |
| 212 s3._pool.clean() |
| 213 assert s3._pool.size() == 0 |
| 214 print " pool is empty" |
| 215 |
| 216 # Create a big object in S3. |
| 217 big_name = str(uuid.uuid4()) |
| 218 contents = "-" * BIG_SIZE |
| 219 bucket.new_key(big_name).set_contents_from_string(contents) |
| 220 |
| 221 # Start some threads to read it and check that they are reading |
| 222 # the correct thing. Each thread will read the object 40 times. |
| 223 threads = [ |
| 224 spawn(read_big_object, s3, bucket, big_name, 20) |
| 225 for _ in range(5) |
| 226 ] |
| 227 |
| 228 # Do some other things that may (incorrectly) re-use the same |
| 229 # connections while the big objects are being read. |
| 230 queriers = [ |
| 231 LittleQuerier(bucket, small_names) |
| 232 for _ in range(5) |
| 233 ] |
| 234 |
| 235 # Clean up. |
| 236 for t in threads: |
| 237 t.join() |
| 238 for q in queriers: |
| 239 q.stop() |
| 240 |
| 241 def main(): |
| 242 test_close_connections() |
| 243 test_reuse_connections() |
| 244 |
| 245 if __name__ == '__main__': |
| 246 main() |
OLD | NEW |