| Index: third_party/boto/boto/sqs/connection.py
|
| ===================================================================
|
| --- third_party/boto/boto/sqs/connection.py (revision 33376)
|
| +++ third_party/boto/boto/sqs/connection.py (working copy)
|
| @@ -19,6 +19,7 @@
|
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| # IN THE SOFTWARE.
|
|
|
| +import boto
|
| from boto.connection import AWSQueryConnection
|
| from boto.sqs.regioninfo import SQSRegionInfo
|
| from boto.sqs.queue import Queue
|
| @@ -32,9 +33,10 @@
|
| """
|
| A Connection to the SQS Service.
|
| """
|
| - DefaultRegionName = 'us-east-1'
|
| - DefaultRegionEndpoint = 'queue.amazonaws.com'
|
| - APIVersion = '2012-11-05'
|
| + DefaultRegionName = boto.config.get('Boto', 'sqs_region_name', 'us-east-1')
|
| + DefaultRegionEndpoint = boto.config.get('Boto', 'sqs_region_endpoint',
|
| + 'queue.amazonaws.com')
|
| + APIVersion = boto.config.get('Boto', 'sqs_version', '2012-11-05')
|
| DefaultContentType = 'text/plain'
|
| ResponseError = SQSError
|
| AuthServiceName = 'sqs'
|
| @@ -113,7 +115,7 @@
|
| Gets one or all attributes of a Queue
|
|
|
| :type queue: A Queue object
|
| - :param queue: The SQS queue to be deleted
|
| + :param queue: The SQS queue to get attributes for
|
|
|
| :type attribute: str
|
| :type attribute: The specific attribute requested. If not
|
| @@ -127,6 +129,7 @@
|
| * LastModifiedTimestamp
|
| * Policy
|
| * ReceiveMessageWaitTimeSeconds
|
| + * RedrivePolicy
|
|
|
| :rtype: :class:`boto.sqs.attributes.Attributes`
|
| :return: An Attributes object containing request value(s).
|
| @@ -141,7 +144,7 @@
|
|
|
| def receive_message(self, queue, number_messages=1,
|
| visibility_timeout=None, attributes=None,
|
| - wait_time_seconds=None):
|
| + wait_time_seconds=None, message_attributes=None):
|
| """
|
| Read messages from an SQS Queue.
|
|
|
| @@ -174,6 +177,11 @@
|
| If a message is available, the call will return sooner than
|
| wait_time_seconds.
|
|
|
| + :type message_attributes: list
|
| + :param message_attributes: The name(s) of additional message
|
| + attributes to return. The default is to return no additional
|
| + message attributes. Use ``['All']`` or ``['.*']`` to return all.
|
| +
|
| :rtype: list
|
| :return: A list of :class:`boto.sqs.message.Message` objects.
|
|
|
| @@ -185,6 +193,9 @@
|
| self.build_list_params(params, attributes, 'AttributeName')
|
| if wait_time_seconds is not None:
|
| params['WaitTimeSeconds'] = wait_time_seconds
|
| + if message_attributes is not None:
|
| + self.build_list_params(params, message_attributes,
|
| + 'MessageAttributeName')
|
| return self.get_list('ReceiveMessage', params,
|
| [('Message', queue.message_class)],
|
| queue.id, queue)
|
| @@ -241,10 +252,61 @@
|
| params = {'ReceiptHandle' : receipt_handle}
|
| return self.get_status('DeleteMessage', params, queue.id)
|
|
|
| - def send_message(self, queue, message_content, delay_seconds=None):
|
| + def send_message(self, queue, message_content, delay_seconds=None,
|
| + message_attributes=None):
|
| + """
|
| + Send a new message to the queue.
|
| +
|
| + :type queue: A :class:`boto.sqs.queue.Queue` object.
|
| + :param queue: The Queue to which the messages will be written.
|
| +
|
| + :type message_content: string
|
| + :param message_content: The body of the message
|
| +
|
| + :type delay_seconds: int
|
| + :param delay_seconds: Number of seconds (0 - 900) to delay this
|
| + message from being processed.
|
| +
|
| + :type message_attributes: dict
|
| + :param message_attributes: Message attributes to set. Should be
|
| + of the form:
|
| +
|
| + {
|
| + "name1": {
|
| + "data_type": "Number",
|
| + "string_value": "1"
|
| + },
|
| + "name2": {
|
| + "data_type": "String",
|
| + "string_value": "Bob"
|
| + }
|
| + }
|
| +
|
| + """
|
| params = {'MessageBody' : message_content}
|
| if delay_seconds:
|
| params['DelaySeconds'] = int(delay_seconds)
|
| +
|
| + if message_attributes is not None:
|
| + for i, name in enumerate(message_attributes.keys(), start=1):
|
| + attribute = message_attributes[name]
|
| + params['MessageAttribute.%s.Name' % i] = name
|
| + if 'data_type' in attribute:
|
| + params['MessageAttribute.%s.Value.DataType' % i] = \
|
| + attribute['data_type']
|
| + if 'string_value' in attribute:
|
| + params['MessageAttribute.%s.Value.StringValue' % i] = \
|
| + attribute['string_value']
|
| + if 'binary_value' in attribute:
|
| + params['MessageAttribute.%s.Value.BinaryValue' % i] = \
|
| + attribute['binary_value']
|
| + if 'string_list_value' in attribute:
|
| + params['MessageAttribute.%s.Value.StringListValue' % i] = \
|
| + attribute['string_list_value']
|
| + if 'binary_list_value' in attribute:
|
| + params['MessageAttribute.%s.Value.BinaryListValue' % i] = \
|
| + attribute['binary_list_value']
|
| +
|
| return self.get_object('SendMessage', params, Message,
|
| queue.id, verb='POST')
|
|
|
| @@ -260,19 +322,44 @@
|
| tuple represents a single message to be written
|
| and consists of and ID (string) that must be unique
|
| within the list of messages, the message body itself
|
| - which can be a maximum of 64K in length, and an
|
| + which can be a maximum of 64K in length, an
|
| integer which represents the delay time (in seconds)
|
| for the message (0-900) before the message will
|
| - be delivered to the queue.
|
| + be delivered to the queue, and an optional dict of
|
| + message attributes like those passed to ``send_message``
|
| + above.
|
| +
|
| """
|
| params = {}
|
| for i, msg in enumerate(messages):
|
| - p_name = 'SendMessageBatchRequestEntry.%i.Id' % (i+1)
|
| - params[p_name] = msg[0]
|
| - p_name = 'SendMessageBatchRequestEntry.%i.MessageBody' % (i+1)
|
| - params[p_name] = msg[1]
|
| - p_name = 'SendMessageBatchRequestEntry.%i.DelaySeconds' % (i+1)
|
| - params[p_name] = msg[2]
|
| + base = 'SendMessageBatchRequestEntry.%i' % (i + 1)
|
| + params['%s.Id' % base] = msg[0]
|
| + params['%s.MessageBody' % base] = msg[1]
|
| + params['%s.DelaySeconds' % base] = msg[2]
|
| + if len(msg) > 3:
|
| + base += '.MessageAttribute'
|
| + for j, name in enumerate(msg[3].keys()):
|
| + attribute = msg[3][name]
|
| +
|
| + p_name = '%s.%i.Name' % (base, j + 1)
|
| + params[p_name] = name
|
| +
|
| + if 'data_type' in attribute:
|
| + p_name = '%s.%i.DataType' % (base, j + 1)
|
| + params[p_name] = attribute['data_type']
|
| + if 'string_value' in attribute:
|
| + p_name = '%s.%i.StringValue' % (base, j + 1)
|
| + params[p_name] = attribute['string_value']
|
| + if 'binary_value' in attribute:
|
| + p_name = '%s.%i.BinaryValue' % (base, j + 1)
|
| + params[p_name] = attribute['binary_value']
|
| + if 'string_list_value' in attribute:
|
| + p_name = '%s.%i.StringListValue' % (base, j + 1)
|
| + params[p_name] = attribute['string_list_value']
|
| + if 'binary_list_value' in attribute:
|
| + p_name = '%s.%i.BinaryListValue' % (base, j + 1)
|
| + params[p_name] = attribute['binary_list_value']
|
| +
|
| return self.get_object('SendMessageBatch', params, BatchResults,
|
| queue.id, verb='POST')
|
|
|
| @@ -357,6 +444,19 @@
|
|
|
| lookup = get_queue
|
|
|
| + def get_dead_letter_source_queues(self, queue):
|
| + """
|
| + Retrieves the dead letter source queues for a given queue.
|
| +
|
| + :type queue: A :class:`boto.sqs.queue.Queue` object.
|
| + :param queue: The queue for which to get DL source queues
|
| + :rtype: list
|
| + :returns: A list of :py:class:`boto.sqs.queue.Queue` instances.
|
| + """
|
| + params = {'QueueUrl': queue.url}
|
| + return self.get_list('ListDeadLetterSourceQueues', params,
|
| + [('QueueUrl', Queue)])
|
| +
|
| #
|
| # Permissions methods
|
| #
|
|
|