Index: tools/telemetry/third_party/gsutil/third_party/boto/boto/services/service.py |
diff --git a/tools/telemetry/third_party/gsutil/third_party/boto/boto/services/service.py b/tools/telemetry/third_party/gsutil/third_party/boto/boto/services/service.py |
deleted file mode 100644 |
index e1a04c8ec21c9e155da46ef78da1d28cf86c930a..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/third_party/boto/boto/services/service.py |
+++ /dev/null |
@@ -1,161 +0,0 @@ |
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ |
-# |
-# Permission is hereby granted, free of charge, to any person obtaining a |
-# copy of this software and associated documentation files (the |
-# "Software"), to deal in the Software without restriction, including |
-# without limitation the rights to use, copy, modify, merge, publish, dis- |
-# tribute, sublicense, and/or sell copies of the Software, and to permit |
-# persons to whom the Software is furnished to do so, subject to the fol- |
-# lowing conditions: |
-# |
-# The above copyright notice and this permission notice shall be included |
-# in all copies or substantial portions of the Software. |
-# |
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
-# IN THE SOFTWARE. |
- |
-import boto |
-from boto.services.message import ServiceMessage |
-from boto.services.servicedef import ServiceDef |
-from boto.pyami.scriptbase import ScriptBase |
-from boto.utils import get_ts |
-import time |
-import os |
-import mimetypes |
- |
- |
-class Service(ScriptBase): |
- |
- # Time required to process a transaction |
- ProcessingTime = 60 |
- |
- def __init__(self, config_file=None, mimetype_files=None): |
- super(Service, self).__init__(config_file) |
- self.name = self.__class__.__name__ |
- self.working_dir = boto.config.get('Pyami', 'working_dir') |
- self.sd = ServiceDef(config_file) |
- self.retry_count = self.sd.getint('retry_count', 5) |
- self.loop_delay = self.sd.getint('loop_delay', 30) |
- self.processing_time = self.sd.getint('processing_time', 60) |
- self.input_queue = self.sd.get_obj('input_queue') |
- self.output_queue = self.sd.get_obj('output_queue') |
- self.output_domain = self.sd.get_obj('output_domain') |
- if mimetype_files: |
- mimetypes.init(mimetype_files) |
- |
- def split_key(key): |
- if key.find(';') < 0: |
- t = (key, '') |
- else: |
- key, type = key.split(';') |
- label, mtype = type.split('=') |
- t = (key, mtype) |
- return t |
- |
- def read_message(self): |
- boto.log.info('read_message') |
- message = self.input_queue.read(self.processing_time) |
- if message: |
- boto.log.info(message.get_body()) |
- key = 'Service-Read' |
- message[key] = get_ts() |
- return message |
- |
- # retrieve the source file from S3 |
- def get_file(self, message): |
- bucket_name = message['Bucket'] |
- key_name = message['InputKey'] |
- file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file')) |
- boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name)) |
- bucket = boto.lookup('s3', bucket_name) |
- key = bucket.new_key(key_name) |
- key.get_contents_to_filename(os.path.join(self.working_dir, file_name)) |
- return file_name |
- |
- # process source file, return list of output files |
- def process_file(self, in_file_name, msg): |
- return [] |
- |
- # store result file in S3 |
- def put_file(self, bucket_name, file_path, key_name=None): |
- boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name)) |
- bucket = boto.lookup('s3', bucket_name) |
- key = bucket.new_key(key_name) |
- key.set_contents_from_filename(file_path) |
- return key |
- |
- def save_results(self, results, input_message, output_message): |
- output_keys = [] |
- for file, type in results: |
- if 'OutputBucket' in input_message: |
- output_bucket = input_message['OutputBucket'] |
- else: |
- output_bucket = input_message['Bucket'] |
- key_name = os.path.split(file)[1] |
- key = self.put_file(output_bucket, file, key_name) |
- output_keys.append('%s;type=%s' % (key.name, type)) |
- output_message['OutputKey'] = ','.join(output_keys) |
- |
- # write message to each output queue |
- def write_message(self, message): |
- message['Service-Write'] = get_ts() |
- message['Server'] = self.name |
- if 'HOSTNAME' in os.environ: |
- message['Host'] = os.environ['HOSTNAME'] |
- else: |
- message['Host'] = 'unknown' |
- message['Instance-ID'] = self.instance_id |
- if self.output_queue: |
- boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id) |
- self.output_queue.write(message) |
- if self.output_domain: |
- boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name) |
- item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']]) |
- self.output_domain.put_attributes(item_name, message) |
- |
- # delete message from input queue |
- def delete_message(self, message): |
- boto.log.info('deleting message from %s' % self.input_queue.id) |
- self.input_queue.delete_message(message) |
- |
- # to clean up any files, etc. after each iteration |
- def cleanup(self): |
- pass |
- |
- def shutdown(self): |
- on_completion = self.sd.get('on_completion', 'shutdown') |
- if on_completion == 'shutdown': |
- if self.instance_id: |
- time.sleep(60) |
- c = boto.connect_ec2() |
- c.terminate_instances([self.instance_id]) |
- |
- def main(self, notify=False): |
- self.notify('Service: %s Starting' % self.name) |
- empty_reads = 0 |
- while self.retry_count < 0 or empty_reads < self.retry_count: |
- try: |
- input_message = self.read_message() |
- if input_message: |
- empty_reads = 0 |
- output_message = ServiceMessage(None, input_message.get_body()) |
- input_file = self.get_file(input_message) |
- results = self.process_file(input_file, output_message) |
- self.save_results(results, input_message, output_message) |
- self.write_message(output_message) |
- self.delete_message(input_message) |
- self.cleanup() |
- else: |
- empty_reads += 1 |
- time.sleep(self.loop_delay) |
- except Exception: |
- boto.log.exception('Service Failed') |
- empty_reads += 1 |
- self.notify('Service: %s Shutting Down' % self.name) |
- self.shutdown() |
- |