Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(373)

Side by Side Diff: third_party/gsutil/boto/sqs/queue.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 """
23 Represents an SQS Queue
24 """
25
26 import urlparse
27 from boto.sqs.message import Message
28
29
30 class Queue:
31
32 def __init__(self, connection=None, url=None, message_class=Message):
33 self.connection = connection
34 self.url = url
35 self.message_class = message_class
36 self.visibility_timeout = None
37
38 def __repr__(self):
39 return 'Queue(%s)' % self.url
40
41 def _id(self):
42 if self.url:
43 val = urlparse.urlparse(self.url)[2]
44 else:
45 val = self.url
46 return val
47 id = property(_id)
48
49 def _name(self):
50 if self.url:
51 val = urlparse.urlparse(self.url)[2].split('/')[2]
52 else:
53 val = self.url
54 return val
55 name = property(_name)
56
57 def startElement(self, name, attrs, connection):
58 return None
59
60 def endElement(self, name, value, connection):
61 if name == 'QueueUrl':
62 self.url = value
63 elif name == 'VisibilityTimeout':
64 self.visibility_timeout = int(value)
65 else:
66 setattr(self, name, value)
67
68 def set_message_class(self, message_class):
69 """
70 Set the message class that should be used when instantiating
71 messages read from the queue. By default, the class
72 :class:`boto.sqs.message.Message` is used but this can be overriden
73 with any class that behaves like a message.
74
75 :type message_class: Message-like class
76 :param message_class: The new Message class
77 """
78 self.message_class = message_class
79
80 def get_attributes(self, attributes='All'):
81 """
82 Retrieves attributes about this queue object and returns
83 them in an Attribute instance (subclass of a Dictionary).
84
85 :type attributes: string
86 :param attributes: String containing one of:
87 ApproximateNumberOfMessages,
88 ApproximateNumberOfMessagesNotVisible,
89 VisibilityTimeout,
90 CreatedTimestamp,
91 LastModifiedTimestamp,
92 Policy
93 ReceiveMessageWaitTimeSeconds
94 :rtype: Attribute object
95 :return: An Attribute object which is a mapping type holding the
96 requested name/value pairs
97 """
98 return self.connection.get_queue_attributes(self, attributes)
99
100 def set_attribute(self, attribute, value):
101 """
102 Set a new value for an attribute of the Queue.
103
104 :type attribute: String
105 :param attribute: The name of the attribute you want to set. The
106 only valid value at this time is: VisibilityTimeout
107 :type value: int
108 :param value: The new value for the attribute.
109 For VisibilityTimeout the value must be an
110 integer number of seconds from 0 to 86400.
111
112 :rtype: bool
113 :return: True if successful, otherwise False.
114 """
115 return self.connection.set_queue_attribute(self, attribute, value)
116
117 def get_timeout(self):
118 """
119 Get the visibility timeout for the queue.
120
121 :rtype: int
122 :return: The number of seconds as an integer.
123 """
124 a = self.get_attributes('VisibilityTimeout')
125 return int(a['VisibilityTimeout'])
126
127 def set_timeout(self, visibility_timeout):
128 """
129 Set the visibility timeout for the queue.
130
131 :type visibility_timeout: int
132 :param visibility_timeout: The desired timeout in seconds
133 """
134 retval = self.set_attribute('VisibilityTimeout', visibility_timeout)
135 if retval:
136 self.visibility_timeout = visibility_timeout
137 return retval
138
139 def add_permission(self, label, aws_account_id, action_name):
140 """
141 Add a permission to a queue.
142
143 :type label: str or unicode
144 :param label: A unique identification of the permission you are setting.
145 Maximum of 80 characters ``[0-9a-zA-Z_-]``
146 Example, AliceSendMessage
147
148 :type aws_account_id: str or unicode
149 :param principal_id: The AWS account number of the principal who
150 will be given permission. The principal must have an AWS account,
151 but does not need to be signed up for Amazon SQS. For information
152 about locating the AWS account identification.
153
154 :type action_name: str or unicode
155 :param action_name: The action. Valid choices are:
156 *|SendMessage|ReceiveMessage|DeleteMessage|
157 ChangeMessageVisibility|GetQueueAttributes
158
159 :rtype: bool
160 :return: True if successful, False otherwise.
161
162 """
163 return self.connection.add_permission(self, label, aws_account_id,
164 action_name)
165
166 def remove_permission(self, label):
167 """
168 Remove a permission from a queue.
169
170 :type label: str or unicode
171 :param label: The unique label associated with the permission
172 being removed.
173
174 :rtype: bool
175 :return: True if successful, False otherwise.
176 """
177 return self.connection.remove_permission(self, label)
178
179 def read(self, visibility_timeout=None, wait_time_seconds=None):
180 """
181 Read a single message from the queue.
182
183 :type visibility_timeout: int
184 :param visibility_timeout: The timeout for this message in seconds
185
186 :type wait_time_seconds: int
187 :param wait_time_seconds: The duration (in seconds) for which the call
188 will wait for a message to arrive in the queue before returning.
189 If a message is available, the call will return sooner than
190 wait_time_seconds.
191
192 :rtype: :class:`boto.sqs.message.Message`
193 :return: A single message or None if queue is empty
194 """
195 rs = self.get_messages(1, visibility_timeout,
196 wait_time_seconds=wait_time_seconds)
197 if len(rs) == 1:
198 return rs[0]
199 else:
200 return None
201
202 def write(self, message, delay_seconds=None):
203 """
204 Add a single message to the queue.
205
206 :type message: Message
207 :param message: The message to be written to the queue
208
209 :rtype: :class:`boto.sqs.message.Message`
210 :return: The :class:`boto.sqs.message.Message` object that was written.
211 """
212 new_msg = self.connection.send_message(self,
213 message.get_body_encoded(),
214 delay_seconds)
215 message.id = new_msg.id
216 message.md5 = new_msg.md5
217 return message
218
219 def write_batch(self, messages):
220 """
221 Delivers up to 10 messages in a single request.
222
223 :type messages: List of lists.
224 :param messages: A list of lists or tuples. Each inner
225 tuple represents a single message to be written
226 and consists of and ID (string) that must be unique
227 within the list of messages, the message body itself
228 which can be a maximum of 64K in length, and an
229 integer which represents the delay time (in seconds)
230 for the message (0-900) before the message will
231 be delivered to the queue.
232 """
233 return self.connection.send_message_batch(self, messages)
234
235 def new_message(self, body=''):
236 """
237 Create new message of appropriate class.
238
239 :type body: message body
240 :param body: The body of the newly created message (optional).
241
242 :rtype: :class:`boto.sqs.message.Message`
243 :return: A new Message object
244 """
245 m = self.message_class(self, body)
246 m.queue = self
247 return m
248
249 # get a variable number of messages, returns a list of messages
250 def get_messages(self, num_messages=1, visibility_timeout=None,
251 attributes=None, wait_time_seconds=None):
252 """
253 Get a variable number of messages.
254
255 :type num_messages: int
256 :param num_messages: The maximum number of messages to read from
257 the queue.
258
259 :type visibility_timeout: int
260 :param visibility_timeout: The VisibilityTimeout for the messages read.
261
262 :type attributes: str
263 :param attributes: The name of additional attribute to return
264 with response or All if you want all attributes. The
265 default is to return no additional attributes. Valid
266 values: All SenderId SentTimestamp ApproximateReceiveCount
267 ApproximateFirstReceiveTimestamp
268
269 :type wait_time_seconds: int
270 :param wait_time_seconds: The duration (in seconds) for which the call
271 will wait for a message to arrive in the queue before returning.
272 If a message is available, the call will return sooner than
273 wait_time_seconds.
274
275 :rtype: list
276 :return: A list of :class:`boto.sqs.message.Message` objects.
277 """
278 return self.connection.receive_message(
279 self, number_messages=num_messages,
280 visibility_timeout=visibility_timeout, attributes=attributes,
281 wait_time_seconds=wait_time_seconds)
282
283 def delete_message(self, message):
284 """
285 Delete a message from the queue.
286
287 :type message: :class:`boto.sqs.message.Message`
288 :param message: The :class:`boto.sqs.message.Message` object to delete.
289
290 :rtype: bool
291 :return: True if successful, False otherwise
292 """
293 return self.connection.delete_message(self, message)
294
295 def delete_message_batch(self, messages):
296 """
297 Deletes a list of messages in a single request.
298
299 :type messages: List of :class:`boto.sqs.message.Message` objects.
300 :param messages: A list of message objects.
301 """
302 return self.connection.delete_message_batch(self, messages)
303
304 def change_message_visibility_batch(self, messages):
305 """
306 A batch version of change_message_visibility that can act
307 on up to 10 messages at a time.
308
309 :type messages: List of tuples.
310 :param messages: A list of tuples where each tuple consists
311 of a :class:`boto.sqs.message.Message` object and an integer
312 that represents the new visibility timeout for that message.
313 """
314 return self.connection.change_message_visibility_batch(self, messages)
315
316 def delete(self):
317 """
318 Delete the queue.
319 """
320 return self.connection.delete_queue(self)
321
322 def clear(self, page_size=10, vtimeout=10):
323 """Utility function to remove all messages from a queue"""
324 n = 0
325 l = self.get_messages(page_size, vtimeout)
326 while l:
327 for m in l:
328 self.delete_message(m)
329 n += 1
330 l = self.get_messages(page_size, vtimeout)
331 return n
332
333 def count(self, page_size=10, vtimeout=10):
334 """
335 Utility function to count the number of messages in a queue.
336 Note: This function now calls GetQueueAttributes to obtain
337 an 'approximate' count of the number of messages in a queue.
338 """
339 a = self.get_attributes('ApproximateNumberOfMessages')
340 return int(a['ApproximateNumberOfMessages'])
341
342 def count_slow(self, page_size=10, vtimeout=10):
343 """
344 Deprecated. This is the old 'count' method that actually counts
345 the messages by reading them all. This gives an accurate count but
346 is very slow for queues with non-trivial number of messasges.
347 Instead, use get_attribute('ApproximateNumberOfMessages') to take
348 advantage of the new SQS capability. This is retained only for
349 the unit tests.
350 """
351 n = 0
352 l = self.get_messages(page_size, vtimeout)
353 while l:
354 for m in l:
355 n += 1
356 l = self.get_messages(page_size, vtimeout)
357 return n
358
359 def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
360 """Utility function to dump the messages in a queue to a file
361 NOTE: Page size must be < 10 else SQS errors"""
362 fp = open(file_name, 'wb')
363 n = 0
364 l = self.get_messages(page_size, vtimeout)
365 while l:
366 for m in l:
367 fp.write(m.get_body())
368 if sep:
369 fp.write(sep)
370 n += 1
371 l = self.get_messages(page_size, vtimeout)
372 fp.close()
373 return n
374
375 def save_to_file(self, fp, sep='\n'):
376 """
377 Read all messages from the queue and persist them to file-like object.
378 Messages are written to the file and the 'sep' string is written
379 in between messages. Messages are deleted from the queue after
380 being written to the file.
381 Returns the number of messages saved.
382 """
383 n = 0
384 m = self.read()
385 while m:
386 n += 1
387 fp.write(m.get_body())
388 if sep:
389 fp.write(sep)
390 self.delete_message(m)
391 m = self.read()
392 return n
393
394 def save_to_filename(self, file_name, sep='\n'):
395 """
396 Read all messages from the queue and persist them to local file.
397 Messages are written to the file and the 'sep' string is written
398 in between messages. Messages are deleted from the queue after
399 being written to the file.
400 Returns the number of messages saved.
401 """
402 fp = open(file_name, 'wb')
403 n = self.save_to_file(fp, sep)
404 fp.close()
405 return n
406
407 # for backwards compatibility
408 save = save_to_filename
409
410 def save_to_s3(self, bucket):
411 """
412 Read all messages from the queue and persist them to S3.
413 Messages are stored in the S3 bucket using a naming scheme of::
414
415 <queue_id>/<message_id>
416
417 Messages are deleted from the queue after being saved to S3.
418 Returns the number of messages saved.
419 """
420 n = 0
421 m = self.read()
422 while m:
423 n += 1
424 key = bucket.new_key('%s/%s' % (self.id, m.id))
425 key.set_contents_from_string(m.get_body())
426 self.delete_message(m)
427 m = self.read()
428 return n
429
430 def load_from_s3(self, bucket, prefix=None):
431 """
432 Load messages previously saved to S3.
433 """
434 n = 0
435 if prefix:
436 prefix = '%s/' % prefix
437 else:
438 prefix = '%s/' % self.id[1:]
439 rs = bucket.list(prefix=prefix)
440 for key in rs:
441 n += 1
442 m = self.new_message(key.get_contents_as_string())
443 self.write(m)
444 return n
445
446 def load_from_file(self, fp, sep='\n'):
447 """Utility function to load messages from a file-like object to a queue" ""
448 n = 0
449 body = ''
450 l = fp.readline()
451 while l:
452 if l == sep:
453 m = Message(self, body)
454 self.write(m)
455 n += 1
456 print 'writing message %d' % n
457 body = ''
458 else:
459 body = body + l
460 l = fp.readline()
461 return n
462
463 def load_from_filename(self, file_name, sep='\n'):
464 """Utility function to load messages from a local filename to a queue"""
465 fp = open(file_name, 'rb')
466 n = self.load_from_file(fp, sep)
467 fp.close()
468 return n
469
470 # for backward compatibility
471 load = load_from_filename
472
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698