| OLD | NEW | 
|---|
| 1 # Copyright (c) 2010 Spotify AB | 1 # Copyright (c) 2010 Spotify AB | 
|  | 2 # Copyright (c) 2010-2011 Yelp | 
| 2 # | 3 # | 
| 3 # Permission is hereby granted, free of charge, to any person obtaining a | 4 # Permission is hereby granted, free of charge, to any person obtaining a | 
| 4 # copy of this software and associated documentation files (the | 5 # copy of this software and associated documentation files (the | 
| 5 # "Software"), to deal in the Software without restriction, including | 6 # "Software"), to deal in the Software without restriction, including | 
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | 7 # without limitation the rights to use, copy, modify, merge, publish, dis- | 
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | 8 # tribute, sublicense, and/or sell copies of the Software, and to permit | 
| 8 # persons to whom the Software is furnished to do so, subject to the fol- | 9 # persons to whom the Software is furnished to do so, subject to the fol- | 
| 9 # lowing conditions: | 10 # lowing conditions: | 
| 10 # | 11 # | 
| 11 # The above copyright notice and this permission notice shall be included | 12 # The above copyright notice and this permission notice shall be included | 
| 12 # in all copies or substantial portions of the Software. | 13 # in all copies or substantial portions of the Software. | 
| 13 # | 14 # | 
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | 
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | 
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | 
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | 
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | 
| 20 # IN THE SOFTWARE. | 21 # IN THE SOFTWARE. | 
| 21 | 22 | 
| 22 """ | 23 """ | 
| 23 Represents a connection to the EMR service | 24 Represents a connection to the EMR service | 
| 24 """ | 25 """ | 
| 25 import types | 26 import types | 
| 26 | 27 | 
| 27 import boto | 28 import boto | 
|  | 29 import boto.utils | 
| 28 from boto.ec2.regioninfo import RegionInfo | 30 from boto.ec2.regioninfo import RegionInfo | 
| 29 from boto.emr.emrobject import JobFlow, RunJobFlowResponse | 31 from boto.emr.emrobject import JobFlow, RunJobFlowResponse | 
|  | 32 from boto.emr.emrobject import AddInstanceGroupsResponse, ModifyInstanceGroupsRe
     sponse | 
| 30 from boto.emr.step import JarStep | 33 from boto.emr.step import JarStep | 
| 31 from boto.connection import AWSQueryConnection | 34 from boto.connection import AWSQueryConnection | 
| 32 from boto.exception import EmrResponseError | 35 from boto.exception import EmrResponseError | 
| 33 | 36 | 
| 34 class EmrConnection(AWSQueryConnection): | 37 class EmrConnection(AWSQueryConnection): | 
| 35 | 38 | 
| 36     APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') | 39     APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') | 
| 37     DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') | 40     DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') | 
| 38     DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', | 41     DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', | 
| 39                                             'elasticmapreduce.amazonaws.com') | 42                                             'elasticmapreduce.amazonaws.com') | 
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 87         :type created_before: datetime | 90         :type created_before: datetime | 
| 88         :param created_before: Bound on job flow creation time | 91         :param created_before: Bound on job flow creation time | 
| 89         """ | 92         """ | 
| 90         params = {} | 93         params = {} | 
| 91 | 94 | 
| 92         if states: | 95         if states: | 
| 93             self.build_list_params(params, states, 'JobFlowStates.member') | 96             self.build_list_params(params, states, 'JobFlowStates.member') | 
| 94         if jobflow_ids: | 97         if jobflow_ids: | 
| 95             self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | 98             self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | 
| 96         if created_after: | 99         if created_after: | 
| 97             params['CreatedAfter'] = created_after.strftime('%Y-%m-%dT%H:%M:%S') | 100             params['CreatedAfter'] = created_after.strftime( | 
|  | 101                 boto.utils.ISO8601) | 
| 98         if created_before: | 102         if created_before: | 
| 99             params['CreatedBefore'] = created_before.strftime('%Y-%m-%dT%H:%M:%S
     ') | 103             params['CreatedBefore'] = created_before.strftime( | 
|  | 104                 boto.utils.ISO8601) | 
| 100 | 105 | 
| 101         return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) | 106         return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) | 
| 102 | 107 | 
| 103     def terminate_jobflow(self, jobflow_id): | 108     def terminate_jobflow(self, jobflow_id): | 
| 104         """ | 109         """ | 
| 105         Terminate an Elastic MapReduce job flow | 110         Terminate an Elastic MapReduce job flow | 
| 106 | 111 | 
| 107         :type jobflow_id: str | 112         :type jobflow_id: str | 
| 108         :param jobflow_id: A jobflow id | 113         :param jobflow_id: A jobflow id | 
| 109         """ | 114         """ | 
| 110         self.terminate_jobflows([jobflow_id]) | 115         self.terminate_jobflows([jobflow_id]) | 
| 111 | 116 | 
| 112     def terminate_jobflows(self, jobflow_ids): | 117     def terminate_jobflows(self, jobflow_ids): | 
| 113         """ | 118         """ | 
| 114         Terminate an Elastic MapReduce job flow | 119         Terminate an Elastic MapReduce job flow | 
| 115 | 120 | 
| 116         :type jobflow_ids: list | 121         :type jobflow_ids: list | 
| 117         :param jobflow_ids: A list of job flow IDs | 122         :param jobflow_ids: A list of job flow IDs | 
| 118         """ | 123         """ | 
| 119         params = {} | 124         params = {} | 
| 120         self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | 125         self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | 
| 121         return self.get_status('TerminateJobFlows', params) | 126         return self.get_status('TerminateJobFlows', params, verb='POST') | 
| 122 | 127 | 
| 123     def add_jobflow_steps(self, jobflow_id, steps): | 128     def add_jobflow_steps(self, jobflow_id, steps): | 
| 124         """ | 129         """ | 
| 125         Adds steps to a jobflow | 130         Adds steps to a jobflow | 
| 126 | 131 | 
| 127         :type jobflow_id: str | 132         :type jobflow_id: str | 
| 128         :param jobflow_id: The job flow id | 133         :param jobflow_id: The job flow id | 
| 129         :type steps: list(boto.emr.Step) | 134         :type steps: list(boto.emr.Step) | 
| 130         :param steps: A list of steps to add to the job | 135         :param steps: A list of steps to add to the job | 
| 131         """ | 136         """ | 
| 132         if type(steps) != types.ListType: | 137         if type(steps) != types.ListType: | 
| 133             steps = [steps] | 138             steps = [steps] | 
| 134         params = {} | 139         params = {} | 
| 135         params['JobFlowId'] = jobflow_id | 140         params['JobFlowId'] = jobflow_id | 
| 136 | 141 | 
| 137         # Step args | 142         # Step args | 
| 138         step_args = [self._build_step_args(step) for step in steps] | 143         step_args = [self._build_step_args(step) for step in steps] | 
| 139         params.update(self._build_step_list(step_args)) | 144         params.update(self._build_step_list(step_args)) | 
| 140 | 145 | 
| 141         return self.get_object('AddJobFlowSteps', params, RunJobFlowResponse) | 146         return self.get_object( | 
|  | 147             'AddJobFlowSteps', params, RunJobFlowResponse, verb='POST') | 
|  | 148 | 
|  | 149     def add_instance_groups(self, jobflow_id, instance_groups): | 
|  | 150         """ | 
|  | 151         Adds instance groups to a running cluster. | 
|  | 152 | 
|  | 153         :type jobflow_id: str | 
|  | 154         :param jobflow_id: The id of the jobflow which will take the new instanc
     e groups | 
|  | 155         :type instance_groups: list(boto.emr.InstanceGroup) | 
|  | 156         :param instance_groups: A list of instance groups to add to the job | 
|  | 157         """ | 
|  | 158         if type(instance_groups) != types.ListType: | 
|  | 159             instance_groups = [instance_groups] | 
|  | 160         params = {} | 
|  | 161         params['JobFlowId'] = jobflow_id | 
|  | 162         params.update(self._build_instance_group_list_args(instance_groups)) | 
|  | 163 | 
|  | 164         return self.get_object('AddInstanceGroups', params, AddInstanceGroupsRes
     ponse, verb='POST') | 
|  | 165 | 
|  | 166     def modify_instance_groups(self, instance_group_ids, new_sizes): | 
|  | 167         """ | 
|  | 168         Modify the number of nodes and configuration settings in an instance gro
     up. | 
|  | 169 | 
|  | 170         :type instance_group_ids: list(str) | 
|  | 171         :param instance_group_ids: A list of the ID's of the instance groups to 
     be modified | 
|  | 172         :type new_sizes: list(int) | 
|  | 173         :param new_sizes: A list of the new sizes for each instance group | 
|  | 174         """ | 
|  | 175         if type(instance_group_ids) != types.ListType: | 
|  | 176             instance_group_ids = [instance_group_ids] | 
|  | 177         if type(new_sizes) != types.ListType: | 
|  | 178             new_sizes = [new_sizes] | 
|  | 179 | 
|  | 180         instance_groups = zip(instance_group_ids, new_sizes) | 
|  | 181 | 
|  | 182         params = {} | 
|  | 183         for k, ig in enumerate(instance_groups): | 
|  | 184             #could be wrong - the example amazon gives uses InstanceRequestCount
     , | 
|  | 185             #while the api documentation says InstanceCount | 
|  | 186             params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0] | 
|  | 187             params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1] | 
|  | 188 | 
|  | 189         return self.get_object('ModifyInstanceGroups', params, ModifyInstanceGro
     upsResponse, verb='POST') | 
| 142 | 190 | 
| 143     def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=Non
     e, | 191     def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=Non
     e, | 
| 144                     master_instance_type='m1.small', | 192                     master_instance_type='m1.small', | 
| 145                     slave_instance_type='m1.small', num_instances=1, | 193                     slave_instance_type='m1.small', num_instances=1, | 
| 146                     action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, | 194                     action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, | 
| 147                     enable_debugging=False, | 195                     enable_debugging=False, | 
| 148                     hadoop_version='0.18', | 196                     hadoop_version='0.20', | 
| 149                     steps=[], | 197                     steps=[], | 
| 150                     bootstrap_actions=[]): | 198                     bootstrap_actions=[], | 
|  | 199                     instance_groups=None, | 
|  | 200                     additional_info=None): | 
| 151         """ | 201         """ | 
| 152         Runs a job flow | 202         Runs a job flow | 
| 153 | 203 | 
| 154         :type name: str | 204         :type name: str | 
| 155         :param name: Name of the job flow | 205         :param name: Name of the job flow | 
| 156         :type log_uri: str | 206         :type log_uri: str | 
| 157         :param log_uri: URI of the S3 bucket to place logs | 207         :param log_uri: URI of the S3 bucket to place logs | 
| 158         :type ec2_keyname: str | 208         :type ec2_keyname: str | 
| 159         :param ec2_keyname: EC2 key used for the instances | 209         :param ec2_keyname: EC2 key used for the instances | 
| 160         :type availability_zone: str | 210         :type availability_zone: str | 
| 161         :param availability_zone: EC2 availability zone of the cluster | 211         :param availability_zone: EC2 availability zone of the cluster | 
| 162         :type master_instance_type: str | 212         :type master_instance_type: str | 
| 163         :param master_instance_type: EC2 instance type of the master | 213         :param master_instance_type: EC2 instance type of the master | 
| 164         :type slave_instance_type: str | 214         :type slave_instance_type: str | 
| 165         :param slave_instance_type: EC2 instance type of the slave nodes | 215         :param slave_instance_type: EC2 instance type of the slave nodes | 
| 166         :type num_instances: int | 216         :type num_instances: int | 
| 167         :param num_instances: Number of instances in the Hadoop cluster | 217         :param num_instances: Number of instances in the Hadoop cluster | 
| 168         :type action_on_failure: str | 218         :type action_on_failure: str | 
| 169         :param action_on_failure: Action to take if a step terminates | 219         :param action_on_failure: Action to take if a step terminates | 
| 170         :type keep_alive: bool | 220         :type keep_alive: bool | 
| 171         :param keep_alive: Denotes whether the cluster should stay alive upon co
     mpletion | 221         :param keep_alive: Denotes whether the cluster should stay alive upon co
     mpletion | 
| 172         :type enable_debugging: bool | 222         :type enable_debugging: bool | 
| 173         :param enable_debugging: Denotes whether AWS console debugging should be
      enabled. | 223         :param enable_debugging: Denotes whether AWS console debugging should be
      enabled. | 
| 174         :type steps: list(boto.emr.Step) | 224         :type steps: list(boto.emr.Step) | 
| 175         :param steps: List of steps to add with the job | 225         :param steps: List of steps to add with the job | 
| 176 | 226         :type bootstrap_actions: list(boto.emr.BootstrapAction) | 
|  | 227         :param bootstrap_actions: List of bootstrap actions that run before Hado
     op starts. | 
|  | 228         :type instance_groups: list(boto.emr.InstanceGroup) | 
|  | 229         :param instance_groups: Optional list of instance groups to use when cre
     ating | 
|  | 230                       this job. NB: When provided, this argument supersedes | 
|  | 231                       num_instances and master/slave_instance_type. | 
|  | 232         :type additional_info: JSON str | 
|  | 233         :param additional_info: A JSON string for selecting additional features | 
| 177         :rtype: str | 234         :rtype: str | 
| 178         :return: The jobflow id | 235         :return: The jobflow id | 
| 179         """ | 236         """ | 
| 180         params = {} | 237         params = {} | 
| 181         if action_on_failure: | 238         if action_on_failure: | 
| 182             params['ActionOnFailure'] = action_on_failure | 239             params['ActionOnFailure'] = action_on_failure | 
| 183         params['Name'] = name | 240         params['Name'] = name | 
| 184         params['LogUri'] = log_uri | 241         params['LogUri'] = log_uri | 
| 185 | 242 | 
| 186         # Instance args | 243         # Common instance args | 
| 187         instance_params = self._build_instance_args(ec2_keyname, availability_zo
     ne, | 244         common_params = self._build_instance_common_args(ec2_keyname, | 
| 188                                                     master_instance_type, slave_
     instance_type, | 245                                                          availability_zone, | 
| 189                                                     num_instances, keep_alive, h
     adoop_version) | 246                                                          keep_alive, hadoop_vers
     ion) | 
| 190         params.update(instance_params) | 247         params.update(common_params) | 
|  | 248 | 
|  | 249         # NB: according to the AWS API's error message, we must | 
|  | 250         # "configure instances either using instance count, master and | 
|  | 251         # slave instance type or instance groups but not both." | 
|  | 252         # | 
|  | 253         # Thus we switch here on the truthiness of instance_groups. | 
|  | 254         if not instance_groups: | 
|  | 255             # Instance args (the common case) | 
|  | 256             instance_params = self._build_instance_count_and_type_args( | 
|  | 257                                                         master_instance_type, | 
|  | 258                                                         slave_instance_type, | 
|  | 259                                                         num_instances) | 
|  | 260             params.update(instance_params) | 
|  | 261         else: | 
|  | 262             # Instance group args (for spot instances or a heterogenous cluster) | 
|  | 263             list_args = self._build_instance_group_list_args(instance_groups) | 
|  | 264             instance_params = dict( | 
|  | 265                 ('Instances.%s' % k, v) for k, v in list_args.iteritems() | 
|  | 266                 ) | 
|  | 267             params.update(instance_params) | 
| 191 | 268 | 
| 192         # Debugging step from EMR API docs | 269         # Debugging step from EMR API docs | 
| 193         if enable_debugging: | 270         if enable_debugging: | 
| 194             debugging_step = JarStep(name='Setup Hadoop Debugging', | 271             debugging_step = JarStep(name='Setup Hadoop Debugging', | 
| 195                                      action_on_failure='TERMINATE_JOB_FLOW', | 272                                      action_on_failure='TERMINATE_JOB_FLOW', | 
| 196                                      main_class=None, | 273                                      main_class=None, | 
| 197                                      jar=self.DebuggingJar, | 274                                      jar=self.DebuggingJar, | 
| 198                                      step_args=self.DebuggingArgs) | 275                                      step_args=self.DebuggingArgs) | 
| 199             steps.insert(0, debugging_step) | 276             steps.insert(0, debugging_step) | 
| 200 | 277 | 
| 201         # Step args | 278         # Step args | 
| 202         if steps: | 279         if steps: | 
| 203             step_args = [self._build_step_args(step) for step in steps] | 280             step_args = [self._build_step_args(step) for step in steps] | 
| 204             params.update(self._build_step_list(step_args)) | 281             params.update(self._build_step_list(step_args)) | 
| 205 | 282 | 
| 206         if bootstrap_actions: | 283         if bootstrap_actions: | 
| 207             bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap
     _action) for bootstrap_action in bootstrap_actions] | 284             bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap
     _action) for bootstrap_action in bootstrap_actions] | 
| 208             params.update(self._build_bootstrap_action_list(bootstrap_action_arg
     s)) | 285             params.update(self._build_bootstrap_action_list(bootstrap_action_arg
     s)) | 
| 209 | 286 | 
| 210         response = self.get_object('RunJobFlow', params, RunJobFlowResponse) | 287         if additional_info is not None: | 
|  | 288             params['AdditionalInfo'] = additional_info | 
|  | 289 | 
|  | 290         response = self.get_object( | 
|  | 291             'RunJobFlow', params, RunJobFlowResponse, verb='POST') | 
| 211         return response.jobflowid | 292         return response.jobflowid | 
| 212 | 293 | 
|  | 294     def set_termination_protection(self, jobflow_id, termination_protection_stat
     us): | 
|  | 295         """ | 
|  | 296         Set termination protection on specified Elastic MapReduce job flows | 
|  | 297 | 
|  | 298         :type jobflow_ids: list or str | 
|  | 299         :param jobflow_ids: A list of job flow IDs | 
|  | 300         :type termination_protection_status: bool | 
|  | 301         :param termination_protection_status: Termination protection status | 
|  | 302         """ | 
|  | 303         assert termination_protection_status in (True, False) | 
|  | 304 | 
|  | 305         params = {} | 
|  | 306         params['TerminationProtected'] = (termination_protection_status and "tru
     e") or "false" | 
|  | 307         self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | 
|  | 308 | 
|  | 309         return self.get_status('SetTerminationProtection', params, verb='POST') | 
|  | 310 | 
|  | 311 | 
| 213     def _build_bootstrap_action_args(self, bootstrap_action): | 312     def _build_bootstrap_action_args(self, bootstrap_action): | 
| 214         bootstrap_action_params = {} | 313         bootstrap_action_params = {} | 
| 215         bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action
     .path | 314         bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action
     .path | 
| 216 | 315 | 
| 217         try: | 316         try: | 
| 218             bootstrap_action_params['Name'] = bootstrap_action.name | 317             bootstrap_action_params['Name'] = bootstrap_action.name | 
| 219         except AttributeError: | 318         except AttributeError: | 
| 220             pass | 319             pass | 
| 221 | 320 | 
| 222         args = bootstrap_action.args() | 321         args = bootstrap_action.args() | 
| (...skipping 18 matching lines...) Expand all  Loading... | 
| 241         step_params['Name'] = step.name | 340         step_params['Name'] = step.name | 
| 242         return step_params | 341         return step_params | 
| 243 | 342 | 
| 244     def _build_bootstrap_action_list(self, bootstrap_actions): | 343     def _build_bootstrap_action_list(self, bootstrap_actions): | 
| 245         if type(bootstrap_actions) != types.ListType: | 344         if type(bootstrap_actions) != types.ListType: | 
| 246             bootstrap_actions = [bootstrap_actions] | 345             bootstrap_actions = [bootstrap_actions] | 
| 247 | 346 | 
| 248         params = {} | 347         params = {} | 
| 249         for i, bootstrap_action in enumerate(bootstrap_actions): | 348         for i, bootstrap_action in enumerate(bootstrap_actions): | 
| 250             for key, value in bootstrap_action.iteritems(): | 349             for key, value in bootstrap_action.iteritems(): | 
| 251                 params['BootstrapActions.memeber.%s.%s' % (i + 1, key)] = value | 350                 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value | 
| 252         return params | 351         return params | 
| 253 | 352 | 
| 254     def _build_step_list(self, steps): | 353     def _build_step_list(self, steps): | 
| 255         if type(steps) != types.ListType: | 354         if type(steps) != types.ListType: | 
| 256             steps = [steps] | 355             steps = [steps] | 
| 257 | 356 | 
| 258         params = {} | 357         params = {} | 
| 259         for i, step in enumerate(steps): | 358         for i, step in enumerate(steps): | 
| 260             for key, value in step.iteritems(): | 359             for key, value in step.iteritems(): | 
| 261                 params['Steps.memeber.%s.%s' % (i+1, key)] = value | 360                 params['Steps.member.%s.%s' % (i+1, key)] = value | 
| 262         return params | 361         return params | 
| 263 | 362 | 
| 264     def _build_instance_args(self, ec2_keyname, availability_zone, master_instan
     ce_type, | 363     def _build_instance_common_args(self, ec2_keyname, availability_zone, | 
| 265                              slave_instance_type, num_instances, keep_alive, had
     oop_version): | 364                                     keep_alive, hadoop_version): | 
|  | 365         """ | 
|  | 366         Takes a number of parameters used when starting a jobflow (as | 
|  | 367         specified in run_jobflow() above). Returns a comparable dict for | 
|  | 368         use in making a RunJobFlow request. | 
|  | 369         """ | 
| 266         params = { | 370         params = { | 
| 267             'Instances.MasterInstanceType' : master_instance_type, |  | 
| 268             'Instances.SlaveInstanceType' : slave_instance_type, |  | 
| 269             'Instances.InstanceCount' : num_instances, |  | 
| 270             'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(), | 371             'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(), | 
| 271             'Instances.HadoopVersion' : hadoop_version | 372             'Instances.HadoopVersion' : hadoop_version | 
| 272         } | 373         } | 
| 273 | 374 | 
| 274         if ec2_keyname: | 375         if ec2_keyname: | 
| 275             params['Instances.Ec2KeyName'] = ec2_keyname | 376             params['Instances.Ec2KeyName'] = ec2_keyname | 
| 276         if availability_zone: | 377         if availability_zone: | 
| 277             params['Placement'] = availability_zone | 378             params['Instances.Placement.AvailabilityZone'] = availability_zone | 
| 278 | 379 | 
| 279         return params | 380         return params | 
| 280 | 381 | 
|  | 382     def _build_instance_count_and_type_args(self, master_instance_type, | 
|  | 383                                             slave_instance_type, num_instances): | 
|  | 384         """ | 
|  | 385         Takes a master instance type (string), a slave instance type | 
|  | 386         (string), and a number of instances. Returns a comparable dict | 
|  | 387         for use in making a RunJobFlow request. | 
|  | 388         """ | 
|  | 389         params = { | 
|  | 390             'Instances.MasterInstanceType' : master_instance_type, | 
|  | 391             'Instances.SlaveInstanceType' : slave_instance_type, | 
|  | 392             'Instances.InstanceCount' : num_instances, | 
|  | 393             } | 
|  | 394         return params | 
|  | 395 | 
|  | 396     def _build_instance_group_args(self, instance_group): | 
|  | 397         """ | 
|  | 398         Takes an InstanceGroup; returns a dict that, when its keys are | 
|  | 399         properly prefixed, can be used for describing InstanceGroups in | 
|  | 400         RunJobFlow or AddInstanceGroups requests. | 
|  | 401         """ | 
|  | 402         params = { | 
|  | 403             'InstanceCount' : instance_group.num_instances, | 
|  | 404             'InstanceRole' : instance_group.role, | 
|  | 405             'InstanceType' : instance_group.type, | 
|  | 406             'Name' : instance_group.name, | 
|  | 407             'Market' : instance_group.market | 
|  | 408         } | 
|  | 409         if instance_group.market == 'SPOT': | 
|  | 410             params['BidPrice'] = instance_group.bidprice | 
|  | 411         return params | 
|  | 412 | 
|  | 413     def _build_instance_group_list_args(self, instance_groups): | 
|  | 414         """ | 
|  | 415         Takes a list of InstanceGroups, or a single InstanceGroup. Returns | 
|  | 416         a comparable dict for use in making a RunJobFlow or AddInstanceGroups | 
|  | 417         request. | 
|  | 418         """ | 
|  | 419         if type(instance_groups) != types.ListType: | 
|  | 420             instance_groups = [instance_groups] | 
|  | 421 | 
|  | 422         params = {} | 
|  | 423         for i, instance_group in enumerate(instance_groups): | 
|  | 424             ig_dict = self._build_instance_group_args(instance_group) | 
|  | 425             for key, value in ig_dict.iteritems(): | 
|  | 426                 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value | 
|  | 427         return params | 
| OLD | NEW | 
|---|