Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: boto/emr/connection.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « boto/emr/bootstrap_action.py ('k') | boto/emr/emrobject.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright (c) 2010 Spotify AB 1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
2 # 3 #
3 # Permission is hereby granted, free of charge, to any person obtaining a 4 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the 5 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including 6 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis- 7 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit 8 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol- 9 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions: 10 # lowing conditions:
10 # 11 #
11 # The above copyright notice and this permission notice shall be included 12 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software. 13 # in all copies or substantial portions of the Software.
13 # 14 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE. 21 # IN THE SOFTWARE.
21 22
22 """ 23 """
23 Represents a connection to the EMR service 24 Represents a connection to the EMR service
24 """ 25 """
25 import types 26 import types
26 27
27 import boto 28 import boto
29 import boto.utils
28 from boto.ec2.regioninfo import RegionInfo 30 from boto.ec2.regioninfo import RegionInfo
29 from boto.emr.emrobject import JobFlow, RunJobFlowResponse 31 from boto.emr.emrobject import JobFlow, RunJobFlowResponse
32 from boto.emr.emrobject import AddInstanceGroupsResponse, ModifyInstanceGroupsRe sponse
30 from boto.emr.step import JarStep 33 from boto.emr.step import JarStep
31 from boto.connection import AWSQueryConnection 34 from boto.connection import AWSQueryConnection
32 from boto.exception import EmrResponseError 35 from boto.exception import EmrResponseError
33 36
34 class EmrConnection(AWSQueryConnection): 37 class EmrConnection(AWSQueryConnection):
35 38
36 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') 39 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31')
37 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') 40 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1')
38 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', 41 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint',
39 'elasticmapreduce.amazonaws.com') 42 'elasticmapreduce.amazonaws.com')
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 :type created_before: datetime 90 :type created_before: datetime
88 :param created_before: Bound on job flow creation time 91 :param created_before: Bound on job flow creation time
89 """ 92 """
90 params = {} 93 params = {}
91 94
92 if states: 95 if states:
93 self.build_list_params(params, states, 'JobFlowStates.member') 96 self.build_list_params(params, states, 'JobFlowStates.member')
94 if jobflow_ids: 97 if jobflow_ids:
95 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') 98 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
96 if created_after: 99 if created_after:
97 params['CreatedAfter'] = created_after.strftime('%Y-%m-%dT%H:%M:%S') 100 params['CreatedAfter'] = created_after.strftime(
101 boto.utils.ISO8601)
98 if created_before: 102 if created_before:
99 params['CreatedBefore'] = created_before.strftime('%Y-%m-%dT%H:%M:%S ') 103 params['CreatedBefore'] = created_before.strftime(
104 boto.utils.ISO8601)
100 105
101 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) 106 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
102 107
103 def terminate_jobflow(self, jobflow_id): 108 def terminate_jobflow(self, jobflow_id):
104 """ 109 """
105 Terminate an Elastic MapReduce job flow 110 Terminate an Elastic MapReduce job flow
106 111
107 :type jobflow_id: str 112 :type jobflow_id: str
108 :param jobflow_id: A jobflow id 113 :param jobflow_id: A jobflow id
109 """ 114 """
110 self.terminate_jobflows([jobflow_id]) 115 self.terminate_jobflows([jobflow_id])
111 116
112 def terminate_jobflows(self, jobflow_ids): 117 def terminate_jobflows(self, jobflow_ids):
113 """ 118 """
114 Terminate an Elastic MapReduce job flow 119 Terminate an Elastic MapReduce job flow
115 120
116 :type jobflow_ids: list 121 :type jobflow_ids: list
117 :param jobflow_ids: A list of job flow IDs 122 :param jobflow_ids: A list of job flow IDs
118 """ 123 """
119 params = {} 124 params = {}
120 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') 125 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
121 return self.get_status('TerminateJobFlows', params) 126 return self.get_status('TerminateJobFlows', params, verb='POST')
122 127
123 def add_jobflow_steps(self, jobflow_id, steps): 128 def add_jobflow_steps(self, jobflow_id, steps):
124 """ 129 """
125 Adds steps to a jobflow 130 Adds steps to a jobflow
126 131
127 :type jobflow_id: str 132 :type jobflow_id: str
128 :param jobflow_id: The job flow id 133 :param jobflow_id: The job flow id
129 :type steps: list(boto.emr.Step) 134 :type steps: list(boto.emr.Step)
130 :param steps: A list of steps to add to the job 135 :param steps: A list of steps to add to the job
131 """ 136 """
132 if type(steps) != types.ListType: 137 if type(steps) != types.ListType:
133 steps = [steps] 138 steps = [steps]
134 params = {} 139 params = {}
135 params['JobFlowId'] = jobflow_id 140 params['JobFlowId'] = jobflow_id
136 141
137 # Step args 142 # Step args
138 step_args = [self._build_step_args(step) for step in steps] 143 step_args = [self._build_step_args(step) for step in steps]
139 params.update(self._build_step_list(step_args)) 144 params.update(self._build_step_list(step_args))
140 145
141 return self.get_object('AddJobFlowSteps', params, RunJobFlowResponse) 146 return self.get_object(
147 'AddJobFlowSteps', params, RunJobFlowResponse, verb='POST')
148
149 def add_instance_groups(self, jobflow_id, instance_groups):
150 """
151 Adds instance groups to a running cluster.
152
153 :type jobflow_id: str
154 :param jobflow_id: The id of the jobflow which will take the new instanc e groups
155 :type instance_groups: list(boto.emr.InstanceGroup)
156 :param instance_groups: A list of instance groups to add to the job
157 """
158 if type(instance_groups) != types.ListType:
159 instance_groups = [instance_groups]
160 params = {}
161 params['JobFlowId'] = jobflow_id
162 params.update(self._build_instance_group_list_args(instance_groups))
163
164 return self.get_object('AddInstanceGroups', params, AddInstanceGroupsRes ponse, verb='POST')
165
166 def modify_instance_groups(self, instance_group_ids, new_sizes):
167 """
168 Modify the number of nodes and configuration settings in an instance gro up.
169
170 :type instance_group_ids: list(str)
171 :param instance_group_ids: A list of the ID's of the instance groups to be modified
172 :type new_sizes: list(int)
173 :param new_sizes: A list of the new sizes for each instance group
174 """
175 if type(instance_group_ids) != types.ListType:
176 instance_group_ids = [instance_group_ids]
177 if type(new_sizes) != types.ListType:
178 new_sizes = [new_sizes]
179
180 instance_groups = zip(instance_group_ids, new_sizes)
181
182 params = {}
183 for k, ig in enumerate(instance_groups):
184 #could be wrong - the example amazon gives uses InstanceRequestCount ,
185 #while the api documentation says InstanceCount
186 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
187 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
188
189 return self.get_object('ModifyInstanceGroups', params, ModifyInstanceGro upsResponse, verb='POST')
142 190
143 def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=Non e, 191 def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=Non e,
144 master_instance_type='m1.small', 192 master_instance_type='m1.small',
145 slave_instance_type='m1.small', num_instances=1, 193 slave_instance_type='m1.small', num_instances=1,
146 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, 194 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
147 enable_debugging=False, 195 enable_debugging=False,
148 hadoop_version='0.18', 196 hadoop_version='0.20',
149 steps=[], 197 steps=[],
150 bootstrap_actions=[]): 198 bootstrap_actions=[],
199 instance_groups=None,
200 additional_info=None):
151 """ 201 """
152 Runs a job flow 202 Runs a job flow
153 203
154 :type name: str 204 :type name: str
155 :param name: Name of the job flow 205 :param name: Name of the job flow
156 :type log_uri: str 206 :type log_uri: str
157 :param log_uri: URI of the S3 bucket to place logs 207 :param log_uri: URI of the S3 bucket to place logs
158 :type ec2_keyname: str 208 :type ec2_keyname: str
159 :param ec2_keyname: EC2 key used for the instances 209 :param ec2_keyname: EC2 key used for the instances
160 :type availability_zone: str 210 :type availability_zone: str
161 :param availability_zone: EC2 availability zone of the cluster 211 :param availability_zone: EC2 availability zone of the cluster
162 :type master_instance_type: str 212 :type master_instance_type: str
163 :param master_instance_type: EC2 instance type of the master 213 :param master_instance_type: EC2 instance type of the master
164 :type slave_instance_type: str 214 :type slave_instance_type: str
165 :param slave_instance_type: EC2 instance type of the slave nodes 215 :param slave_instance_type: EC2 instance type of the slave nodes
166 :type num_instances: int 216 :type num_instances: int
167 :param num_instances: Number of instances in the Hadoop cluster 217 :param num_instances: Number of instances in the Hadoop cluster
168 :type action_on_failure: str 218 :type action_on_failure: str
169 :param action_on_failure: Action to take if a step terminates 219 :param action_on_failure: Action to take if a step terminates
170 :type keep_alive: bool 220 :type keep_alive: bool
171 :param keep_alive: Denotes whether the cluster should stay alive upon co mpletion 221 :param keep_alive: Denotes whether the cluster should stay alive upon co mpletion
172 :type enable_debugging: bool 222 :type enable_debugging: bool
173 :param enable_debugging: Denotes whether AWS console debugging should be enabled. 223 :param enable_debugging: Denotes whether AWS console debugging should be enabled.
174 :type steps: list(boto.emr.Step) 224 :type steps: list(boto.emr.Step)
175 :param steps: List of steps to add with the job 225 :param steps: List of steps to add with the job
176 226 :type bootstrap_actions: list(boto.emr.BootstrapAction)
227 :param bootstrap_actions: List of bootstrap actions that run before Hado op starts.
228 :type instance_groups: list(boto.emr.InstanceGroup)
229 :param instance_groups: Optional list of instance groups to use when cre ating
230 this job. NB: When provided, this argument supersedes
231 num_instances and master/slave_instance_type.
232 :type additional_info: JSON str
233 :param additional_info: A JSON string for selecting additional features
177 :rtype: str 234 :rtype: str
178 :return: The jobflow id 235 :return: The jobflow id
179 """ 236 """
180 params = {} 237 params = {}
181 if action_on_failure: 238 if action_on_failure:
182 params['ActionOnFailure'] = action_on_failure 239 params['ActionOnFailure'] = action_on_failure
183 params['Name'] = name 240 params['Name'] = name
184 params['LogUri'] = log_uri 241 params['LogUri'] = log_uri
185 242
186 # Instance args 243 # Common instance args
187 instance_params = self._build_instance_args(ec2_keyname, availability_zo ne, 244 common_params = self._build_instance_common_args(ec2_keyname,
188 master_instance_type, slave_ instance_type, 245 availability_zone,
189 num_instances, keep_alive, h adoop_version) 246 keep_alive, hadoop_vers ion)
190 params.update(instance_params) 247 params.update(common_params)
248
249 # NB: according to the AWS API's error message, we must
250 # "configure instances either using instance count, master and
251 # slave instance type or instance groups but not both."
252 #
253 # Thus we switch here on the truthiness of instance_groups.
254 if not instance_groups:
255 # Instance args (the common case)
256 instance_params = self._build_instance_count_and_type_args(
257 master_instance_type,
258 slave_instance_type,
259 num_instances)
260 params.update(instance_params)
261 else:
262 # Instance group args (for spot instances or a heterogenous cluster)
263 list_args = self._build_instance_group_list_args(instance_groups)
264 instance_params = dict(
265 ('Instances.%s' % k, v) for k, v in list_args.iteritems()
266 )
267 params.update(instance_params)
191 268
192 # Debugging step from EMR API docs 269 # Debugging step from EMR API docs
193 if enable_debugging: 270 if enable_debugging:
194 debugging_step = JarStep(name='Setup Hadoop Debugging', 271 debugging_step = JarStep(name='Setup Hadoop Debugging',
195 action_on_failure='TERMINATE_JOB_FLOW', 272 action_on_failure='TERMINATE_JOB_FLOW',
196 main_class=None, 273 main_class=None,
197 jar=self.DebuggingJar, 274 jar=self.DebuggingJar,
198 step_args=self.DebuggingArgs) 275 step_args=self.DebuggingArgs)
199 steps.insert(0, debugging_step) 276 steps.insert(0, debugging_step)
200 277
201 # Step args 278 # Step args
202 if steps: 279 if steps:
203 step_args = [self._build_step_args(step) for step in steps] 280 step_args = [self._build_step_args(step) for step in steps]
204 params.update(self._build_step_list(step_args)) 281 params.update(self._build_step_list(step_args))
205 282
206 if bootstrap_actions: 283 if bootstrap_actions:
207 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap _action) for bootstrap_action in bootstrap_actions] 284 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap _action) for bootstrap_action in bootstrap_actions]
208 params.update(self._build_bootstrap_action_list(bootstrap_action_arg s)) 285 params.update(self._build_bootstrap_action_list(bootstrap_action_arg s))
209 286
210 response = self.get_object('RunJobFlow', params, RunJobFlowResponse) 287 if additional_info is not None:
288 params['AdditionalInfo'] = additional_info
289
290 response = self.get_object(
291 'RunJobFlow', params, RunJobFlowResponse, verb='POST')
211 return response.jobflowid 292 return response.jobflowid
212 293
294 def set_termination_protection(self, jobflow_id, termination_protection_stat us):
295 """
296 Set termination protection on specified Elastic MapReduce job flows
297
298 :type jobflow_ids: list or str
299 :param jobflow_ids: A list of job flow IDs
300 :type termination_protection_status: bool
301 :param termination_protection_status: Termination protection status
302 """
303 assert termination_protection_status in (True, False)
304
305 params = {}
306 params['TerminationProtected'] = (termination_protection_status and "tru e") or "false"
307 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
308
309 return self.get_status('SetTerminationProtection', params, verb='POST')
310
311
213 def _build_bootstrap_action_args(self, bootstrap_action): 312 def _build_bootstrap_action_args(self, bootstrap_action):
214 bootstrap_action_params = {} 313 bootstrap_action_params = {}
215 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action .path 314 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action .path
216 315
217 try: 316 try:
218 bootstrap_action_params['Name'] = bootstrap_action.name 317 bootstrap_action_params['Name'] = bootstrap_action.name
219 except AttributeError: 318 except AttributeError:
220 pass 319 pass
221 320
222 args = bootstrap_action.args() 321 args = bootstrap_action.args()
(...skipping 18 matching lines...) Expand all
241 step_params['Name'] = step.name 340 step_params['Name'] = step.name
242 return step_params 341 return step_params
243 342
244 def _build_bootstrap_action_list(self, bootstrap_actions): 343 def _build_bootstrap_action_list(self, bootstrap_actions):
245 if type(bootstrap_actions) != types.ListType: 344 if type(bootstrap_actions) != types.ListType:
246 bootstrap_actions = [bootstrap_actions] 345 bootstrap_actions = [bootstrap_actions]
247 346
248 params = {} 347 params = {}
249 for i, bootstrap_action in enumerate(bootstrap_actions): 348 for i, bootstrap_action in enumerate(bootstrap_actions):
250 for key, value in bootstrap_action.iteritems(): 349 for key, value in bootstrap_action.iteritems():
251 params['BootstrapActions.memeber.%s.%s' % (i + 1, key)] = value 350 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
252 return params 351 return params
253 352
254 def _build_step_list(self, steps): 353 def _build_step_list(self, steps):
255 if type(steps) != types.ListType: 354 if type(steps) != types.ListType:
256 steps = [steps] 355 steps = [steps]
257 356
258 params = {} 357 params = {}
259 for i, step in enumerate(steps): 358 for i, step in enumerate(steps):
260 for key, value in step.iteritems(): 359 for key, value in step.iteritems():
261 params['Steps.memeber.%s.%s' % (i+1, key)] = value 360 params['Steps.member.%s.%s' % (i+1, key)] = value
262 return params 361 return params
263 362
264 def _build_instance_args(self, ec2_keyname, availability_zone, master_instan ce_type, 363 def _build_instance_common_args(self, ec2_keyname, availability_zone,
265 slave_instance_type, num_instances, keep_alive, had oop_version): 364 keep_alive, hadoop_version):
365 """
366 Takes a number of parameters used when starting a jobflow (as
367 specified in run_jobflow() above). Returns a comparable dict for
368 use in making a RunJobFlow request.
369 """
266 params = { 370 params = {
267 'Instances.MasterInstanceType' : master_instance_type,
268 'Instances.SlaveInstanceType' : slave_instance_type,
269 'Instances.InstanceCount' : num_instances,
270 'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(), 371 'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(),
271 'Instances.HadoopVersion' : hadoop_version 372 'Instances.HadoopVersion' : hadoop_version
272 } 373 }
273 374
274 if ec2_keyname: 375 if ec2_keyname:
275 params['Instances.Ec2KeyName'] = ec2_keyname 376 params['Instances.Ec2KeyName'] = ec2_keyname
276 if availability_zone: 377 if availability_zone:
277 params['Placement'] = availability_zone 378 params['Instances.Placement.AvailabilityZone'] = availability_zone
278 379
279 return params 380 return params
280 381
382 def _build_instance_count_and_type_args(self, master_instance_type,
383 slave_instance_type, num_instances):
384 """
385 Takes a master instance type (string), a slave instance type
386 (string), and a number of instances. Returns a comparable dict
387 for use in making a RunJobFlow request.
388 """
389 params = {
390 'Instances.MasterInstanceType' : master_instance_type,
391 'Instances.SlaveInstanceType' : slave_instance_type,
392 'Instances.InstanceCount' : num_instances,
393 }
394 return params
395
396 def _build_instance_group_args(self, instance_group):
397 """
398 Takes an InstanceGroup; returns a dict that, when its keys are
399 properly prefixed, can be used for describing InstanceGroups in
400 RunJobFlow or AddInstanceGroups requests.
401 """
402 params = {
403 'InstanceCount' : instance_group.num_instances,
404 'InstanceRole' : instance_group.role,
405 'InstanceType' : instance_group.type,
406 'Name' : instance_group.name,
407 'Market' : instance_group.market
408 }
409 if instance_group.market == 'SPOT':
410 params['BidPrice'] = instance_group.bidprice
411 return params
412
413 def _build_instance_group_list_args(self, instance_groups):
414 """
415 Takes a list of InstanceGroups, or a single InstanceGroup. Returns
416 a comparable dict for use in making a RunJobFlow or AddInstanceGroups
417 request.
418 """
419 if type(instance_groups) != types.ListType:
420 instance_groups = [instance_groups]
421
422 params = {}
423 for i, instance_group in enumerate(instance_groups):
424 ig_dict = self._build_instance_group_args(instance_group)
425 for key, value in ig_dict.iteritems():
426 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
427 return params
OLDNEW
« no previous file with comments | « boto/emr/bootstrap_action.py ('k') | boto/emr/emrobject.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698