Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(868)

Unified Diff: boto/emr/connection.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « boto/emr/bootstrap_action.py ('k') | boto/emr/emrobject.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: boto/emr/connection.py
diff --git a/boto/emr/connection.py b/boto/emr/connection.py
index 2bfd36884acf67897d0bbd2622855a1fc5419b2b..b1effcfc8f39d98869cd45ab93b6f49f20636148 100644
--- a/boto/emr/connection.py
+++ b/boto/emr/connection.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010 Spotify AB
+# Copyright (c) 2010-2011 Yelp
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
@@ -25,8 +26,10 @@ Represents a connection to the EMR service
import types
import boto
+import boto.utils
from boto.ec2.regioninfo import RegionInfo
from boto.emr.emrobject import JobFlow, RunJobFlowResponse
+from boto.emr.emrobject import AddInstanceGroupsResponse, ModifyInstanceGroupsResponse
from boto.emr.step import JarStep
from boto.connection import AWSQueryConnection
from boto.exception import EmrResponseError
@@ -94,9 +97,11 @@ class EmrConnection(AWSQueryConnection):
if jobflow_ids:
self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
if created_after:
- params['CreatedAfter'] = created_after.strftime('%Y-%m-%dT%H:%M:%S')
+ params['CreatedAfter'] = created_after.strftime(
+ boto.utils.ISO8601)
if created_before:
- params['CreatedBefore'] = created_before.strftime('%Y-%m-%dT%H:%M:%S')
+ params['CreatedBefore'] = created_before.strftime(
+ boto.utils.ISO8601)
return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
@@ -105,9 +110,9 @@ class EmrConnection(AWSQueryConnection):
Terminate an Elastic MapReduce job flow
:type jobflow_id: str
- :param jobflow_id: A jobflow id
+ :param jobflow_id: A jobflow id
"""
- self.terminate_jobflows([jobflow_id])
+ self.terminate_jobflows([jobflow_id])
def terminate_jobflows(self, jobflow_ids):
"""
@@ -118,7 +123,7 @@ class EmrConnection(AWSQueryConnection):
"""
params = {}
self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
- return self.get_status('TerminateJobFlows', params)
+ return self.get_status('TerminateJobFlows', params, verb='POST')
def add_jobflow_steps(self, jobflow_id, steps):
"""
@@ -138,16 +143,61 @@ class EmrConnection(AWSQueryConnection):
step_args = [self._build_step_args(step) for step in steps]
params.update(self._build_step_list(step_args))
- return self.get_object('AddJobFlowSteps', params, RunJobFlowResponse)
+ return self.get_object(
+ 'AddJobFlowSteps', params, RunJobFlowResponse, verb='POST')
+
+ def add_instance_groups(self, jobflow_id, instance_groups):
+ """
+ Adds instance groups to a running cluster.
+
+ :type jobflow_id: str
+ :param jobflow_id: The id of the jobflow which will take the new instance groups
+ :type instance_groups: list(boto.emr.InstanceGroup)
+ :param instance_groups: A list of instance groups to add to the job
+ """
+ if type(instance_groups) != types.ListType:
+ instance_groups = [instance_groups]
+ params = {}
+ params['JobFlowId'] = jobflow_id
+ params.update(self._build_instance_group_list_args(instance_groups))
+
+ return self.get_object('AddInstanceGroups', params, AddInstanceGroupsResponse, verb='POST')
+
+ def modify_instance_groups(self, instance_group_ids, new_sizes):
+ """
+ Modify the number of nodes and configuration settings in an instance group.
+
+ :type instance_group_ids: list(str)
+ :param instance_group_ids: A list of the ID's of the instance groups to be modified
+ :type new_sizes: list(int)
+ :param new_sizes: A list of the new sizes for each instance group
+ """
+ if type(instance_group_ids) != types.ListType:
+ instance_group_ids = [instance_group_ids]
+ if type(new_sizes) != types.ListType:
+ new_sizes = [new_sizes]
+
+ instance_groups = zip(instance_group_ids, new_sizes)
+
+ params = {}
+ for k, ig in enumerate(instance_groups):
+ #could be wrong - the example amazon gives uses InstanceRequestCount,
+ #while the api documentation says InstanceCount
+ params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
+ params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
+
+ return self.get_object('ModifyInstanceGroups', params, ModifyInstanceGroupsResponse, verb='POST')
def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=None,
master_instance_type='m1.small',
slave_instance_type='m1.small', num_instances=1,
action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
enable_debugging=False,
- hadoop_version='0.18',
+ hadoop_version='0.20',
steps=[],
- bootstrap_actions=[]):
+ bootstrap_actions=[],
+ instance_groups=None,
+ additional_info=None):
"""
Runs a job flow
@@ -173,7 +223,14 @@ class EmrConnection(AWSQueryConnection):
:param enable_debugging: Denotes whether AWS console debugging should be enabled.
:type steps: list(boto.emr.Step)
:param steps: List of steps to add with the job
-
+ :type bootstrap_actions: list(boto.emr.BootstrapAction)
+ :param bootstrap_actions: List of bootstrap actions that run before Hadoop starts.
+ :type instance_groups: list(boto.emr.InstanceGroup)
+ :param instance_groups: Optional list of instance groups to use when creating
+ this job. NB: When provided, this argument supersedes
+ num_instances and master/slave_instance_type.
+ :type additional_info: JSON str
+ :param additional_info: A JSON string for selecting additional features
:rtype: str
:return: The jobflow id
"""
@@ -183,11 +240,31 @@ class EmrConnection(AWSQueryConnection):
params['Name'] = name
params['LogUri'] = log_uri
- # Instance args
- instance_params = self._build_instance_args(ec2_keyname, availability_zone,
- master_instance_type, slave_instance_type,
- num_instances, keep_alive, hadoop_version)
- params.update(instance_params)
+ # Common instance args
+ common_params = self._build_instance_common_args(ec2_keyname,
+ availability_zone,
+ keep_alive, hadoop_version)
+ params.update(common_params)
+
+ # NB: according to the AWS API's error message, we must
+ # "configure instances either using instance count, master and
+ # slave instance type or instance groups but not both."
+ #
+ # Thus we switch here on the truthiness of instance_groups.
+ if not instance_groups:
+ # Instance args (the common case)
+ instance_params = self._build_instance_count_and_type_args(
+ master_instance_type,
+ slave_instance_type,
+ num_instances)
+ params.update(instance_params)
+ else:
+ # Instance group args (for spot instances or a heterogenous cluster)
+ list_args = self._build_instance_group_list_args(instance_groups)
+ instance_params = dict(
+ ('Instances.%s' % k, v) for k, v in list_args.iteritems()
+ )
+ params.update(instance_params)
# Debugging step from EMR API docs
if enable_debugging:
@@ -207,9 +284,31 @@ class EmrConnection(AWSQueryConnection):
bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions]
params.update(self._build_bootstrap_action_list(bootstrap_action_args))
- response = self.get_object('RunJobFlow', params, RunJobFlowResponse)
+ if additional_info is not None:
+ params['AdditionalInfo'] = additional_info
+
+ response = self.get_object(
+ 'RunJobFlow', params, RunJobFlowResponse, verb='POST')
return response.jobflowid
+ def set_termination_protection(self, jobflow_id, termination_protection_status):
+ """
+ Set termination protection on specified Elastic MapReduce job flows
+
+ :type jobflow_ids: list or str
+ :param jobflow_ids: A list of job flow IDs
+ :type termination_protection_status: bool
+ :param termination_protection_status: Termination protection status
+ """
+ assert termination_protection_status in (True, False)
+
+ params = {}
+ params['TerminationProtected'] = (termination_protection_status and "true") or "false"
+ self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
+
+ return self.get_status('SetTerminationProtection', params, verb='POST')
+
+
def _build_bootstrap_action_args(self, bootstrap_action):
bootstrap_action_params = {}
bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path
@@ -248,7 +347,7 @@ class EmrConnection(AWSQueryConnection):
params = {}
for i, bootstrap_action in enumerate(bootstrap_actions):
for key, value in bootstrap_action.iteritems():
- params['BootstrapActions.memeber.%s.%s' % (i + 1, key)] = value
+ params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
return params
def _build_step_list(self, steps):
@@ -258,15 +357,17 @@ class EmrConnection(AWSQueryConnection):
params = {}
for i, step in enumerate(steps):
for key, value in step.iteritems():
- params['Steps.memeber.%s.%s' % (i+1, key)] = value
+ params['Steps.member.%s.%s' % (i+1, key)] = value
return params
- def _build_instance_args(self, ec2_keyname, availability_zone, master_instance_type,
- slave_instance_type, num_instances, keep_alive, hadoop_version):
+ def _build_instance_common_args(self, ec2_keyname, availability_zone,
+ keep_alive, hadoop_version):
+ """
+ Takes a number of parameters used when starting a jobflow (as
+ specified in run_jobflow() above). Returns a comparable dict for
+ use in making a RunJobFlow request.
+ """
params = {
- 'Instances.MasterInstanceType' : master_instance_type,
- 'Instances.SlaveInstanceType' : slave_instance_type,
- 'Instances.InstanceCount' : num_instances,
'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(),
'Instances.HadoopVersion' : hadoop_version
}
@@ -274,7 +375,53 @@ class EmrConnection(AWSQueryConnection):
if ec2_keyname:
params['Instances.Ec2KeyName'] = ec2_keyname
if availability_zone:
- params['Placement'] = availability_zone
+ params['Instances.Placement.AvailabilityZone'] = availability_zone
+
+ return params
+
+ def _build_instance_count_and_type_args(self, master_instance_type,
+ slave_instance_type, num_instances):
+ """
+ Takes a master instance type (string), a slave instance type
+ (string), and a number of instances. Returns a comparable dict
+ for use in making a RunJobFlow request.
+ """
+ params = {
+ 'Instances.MasterInstanceType' : master_instance_type,
+ 'Instances.SlaveInstanceType' : slave_instance_type,
+ 'Instances.InstanceCount' : num_instances,
+ }
+ return params
+ def _build_instance_group_args(self, instance_group):
+ """
+ Takes an InstanceGroup; returns a dict that, when its keys are
+ properly prefixed, can be used for describing InstanceGroups in
+ RunJobFlow or AddInstanceGroups requests.
+ """
+ params = {
+ 'InstanceCount' : instance_group.num_instances,
+ 'InstanceRole' : instance_group.role,
+ 'InstanceType' : instance_group.type,
+ 'Name' : instance_group.name,
+ 'Market' : instance_group.market
+ }
+ if instance_group.market == 'SPOT':
+ params['BidPrice'] = instance_group.bidprice
return params
+ def _build_instance_group_list_args(self, instance_groups):
+ """
+ Takes a list of InstanceGroups, or a single InstanceGroup. Returns
+ a comparable dict for use in making a RunJobFlow or AddInstanceGroups
+ request.
+ """
+ if type(instance_groups) != types.ListType:
+ instance_groups = [instance_groups]
+
+ params = {}
+ for i, instance_group in enumerate(instance_groups):
+ ig_dict = self._build_instance_group_args(instance_group)
+ for key, value in ig_dict.iteritems():
+ params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
+ return params
« no previous file with comments | « boto/emr/bootstrap_action.py ('k') | boto/emr/emrobject.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698