| Index: boto/emr/connection.py
|
| diff --git a/boto/emr/connection.py b/boto/emr/connection.py
|
| index 2bfd36884acf67897d0bbd2622855a1fc5419b2b..b1effcfc8f39d98869cd45ab93b6f49f20636148 100644
|
| --- a/boto/emr/connection.py
|
| +++ b/boto/emr/connection.py
|
| @@ -1,4 +1,5 @@
|
| # Copyright (c) 2010 Spotify AB
|
| +# Copyright (c) 2010-2011 Yelp
|
| #
|
| # Permission is hereby granted, free of charge, to any person obtaining a
|
| # copy of this software and associated documentation files (the
|
| @@ -25,8 +26,10 @@ Represents a connection to the EMR service
|
| import types
|
|
|
| import boto
|
| +import boto.utils
|
| from boto.ec2.regioninfo import RegionInfo
|
| from boto.emr.emrobject import JobFlow, RunJobFlowResponse
|
| +from boto.emr.emrobject import AddInstanceGroupsResponse, ModifyInstanceGroupsResponse
|
| from boto.emr.step import JarStep
|
| from boto.connection import AWSQueryConnection
|
| from boto.exception import EmrResponseError
|
| @@ -94,9 +97,11 @@ class EmrConnection(AWSQueryConnection):
|
| if jobflow_ids:
|
| self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
|
| if created_after:
|
| - params['CreatedAfter'] = created_after.strftime('%Y-%m-%dT%H:%M:%S')
|
| + params['CreatedAfter'] = created_after.strftime(
|
| + boto.utils.ISO8601)
|
| if created_before:
|
| - params['CreatedBefore'] = created_before.strftime('%Y-%m-%dT%H:%M:%S')
|
| + params['CreatedBefore'] = created_before.strftime(
|
| + boto.utils.ISO8601)
|
|
|
| return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
|
|
|
| @@ -105,9 +110,9 @@ class EmrConnection(AWSQueryConnection):
|
| Terminate an Elastic MapReduce job flow
|
|
|
| :type jobflow_id: str
|
| - :param jobflow_id: A jobflow id
|
| + :param jobflow_id: A jobflow id
|
| """
|
| - self.terminate_jobflows([jobflow_id])
|
| + self.terminate_jobflows([jobflow_id])
|
|
|
| def terminate_jobflows(self, jobflow_ids):
|
| """
|
| @@ -118,7 +123,7 @@ class EmrConnection(AWSQueryConnection):
|
| """
|
| params = {}
|
| self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
|
| - return self.get_status('TerminateJobFlows', params)
|
| + return self.get_status('TerminateJobFlows', params, verb='POST')
|
|
|
| def add_jobflow_steps(self, jobflow_id, steps):
|
| """
|
| @@ -138,16 +143,61 @@ class EmrConnection(AWSQueryConnection):
|
| step_args = [self._build_step_args(step) for step in steps]
|
| params.update(self._build_step_list(step_args))
|
|
|
| - return self.get_object('AddJobFlowSteps', params, RunJobFlowResponse)
|
| + return self.get_object(
|
| + 'AddJobFlowSteps', params, RunJobFlowResponse, verb='POST')
|
| +
|
| + def add_instance_groups(self, jobflow_id, instance_groups):
|
| + """
|
| + Adds instance groups to a running cluster.
|
| +
|
| + :type jobflow_id: str
|
| + :param jobflow_id: The id of the jobflow which will take the new instance groups
|
| + :type instance_groups: list(boto.emr.InstanceGroup)
|
| + :param instance_groups: A list of instance groups to add to the job
|
| + """
|
| + if type(instance_groups) != types.ListType:
|
| + instance_groups = [instance_groups]
|
| + params = {}
|
| + params['JobFlowId'] = jobflow_id
|
| + params.update(self._build_instance_group_list_args(instance_groups))
|
| +
|
| + return self.get_object('AddInstanceGroups', params, AddInstanceGroupsResponse, verb='POST')
|
| +
|
| + def modify_instance_groups(self, instance_group_ids, new_sizes):
|
| + """
|
| + Modify the number of nodes and configuration settings in an instance group.
|
| +
|
| + :type instance_group_ids: list(str)
|
| + :param instance_group_ids: A list of the ID's of the instance groups to be modified
|
| + :type new_sizes: list(int)
|
| + :param new_sizes: A list of the new sizes for each instance group
|
| + """
|
| + if type(instance_group_ids) != types.ListType:
|
| + instance_group_ids = [instance_group_ids]
|
| + if type(new_sizes) != types.ListType:
|
| + new_sizes = [new_sizes]
|
| +
|
| + instance_groups = zip(instance_group_ids, new_sizes)
|
| +
|
| + params = {}
|
| + for k, ig in enumerate(instance_groups):
|
| + #could be wrong - the example amazon gives uses InstanceRequestCount,
|
| + #while the api documentation says InstanceCount
|
| + params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
|
| + params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
|
| +
|
| + return self.get_object('ModifyInstanceGroups', params, ModifyInstanceGroupsResponse, verb='POST')
|
|
|
| def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=None,
|
| master_instance_type='m1.small',
|
| slave_instance_type='m1.small', num_instances=1,
|
| action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
|
| enable_debugging=False,
|
| - hadoop_version='0.18',
|
| + hadoop_version='0.20',
|
| steps=[],
|
| - bootstrap_actions=[]):
|
| + bootstrap_actions=[],
|
| + instance_groups=None,
|
| + additional_info=None):
|
| """
|
| Runs a job flow
|
|
|
| @@ -173,7 +223,14 @@ class EmrConnection(AWSQueryConnection):
|
| :param enable_debugging: Denotes whether AWS console debugging should be enabled.
|
| :type steps: list(boto.emr.Step)
|
| :param steps: List of steps to add with the job
|
| -
|
| + :type bootstrap_actions: list(boto.emr.BootstrapAction)
|
| + :param bootstrap_actions: List of bootstrap actions that run before Hadoop starts.
|
| + :type instance_groups: list(boto.emr.InstanceGroup)
|
| + :param instance_groups: Optional list of instance groups to use when creating
|
| + this job. NB: When provided, this argument supersedes
|
| + num_instances and master/slave_instance_type.
|
| + :type additional_info: JSON str
|
| + :param additional_info: A JSON string for selecting additional features
|
| :rtype: str
|
| :return: The jobflow id
|
| """
|
| @@ -183,11 +240,31 @@ class EmrConnection(AWSQueryConnection):
|
| params['Name'] = name
|
| params['LogUri'] = log_uri
|
|
|
| - # Instance args
|
| - instance_params = self._build_instance_args(ec2_keyname, availability_zone,
|
| - master_instance_type, slave_instance_type,
|
| - num_instances, keep_alive, hadoop_version)
|
| - params.update(instance_params)
|
| + # Common instance args
|
| + common_params = self._build_instance_common_args(ec2_keyname,
|
| + availability_zone,
|
| + keep_alive, hadoop_version)
|
| + params.update(common_params)
|
| +
|
| + # NB: according to the AWS API's error message, we must
|
| + # "configure instances either using instance count, master and
|
| + # slave instance type or instance groups but not both."
|
| + #
|
| + # Thus we switch here on the truthiness of instance_groups.
|
| + if not instance_groups:
|
| + # Instance args (the common case)
|
| + instance_params = self._build_instance_count_and_type_args(
|
| + master_instance_type,
|
| + slave_instance_type,
|
| + num_instances)
|
| + params.update(instance_params)
|
| + else:
|
| + # Instance group args (for spot instances or a heterogenous cluster)
|
| + list_args = self._build_instance_group_list_args(instance_groups)
|
| + instance_params = dict(
|
| + ('Instances.%s' % k, v) for k, v in list_args.iteritems()
|
| + )
|
| + params.update(instance_params)
|
|
|
| # Debugging step from EMR API docs
|
| if enable_debugging:
|
| @@ -207,9 +284,31 @@ class EmrConnection(AWSQueryConnection):
|
| bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions]
|
| params.update(self._build_bootstrap_action_list(bootstrap_action_args))
|
|
|
| - response = self.get_object('RunJobFlow', params, RunJobFlowResponse)
|
| + if additional_info is not None:
|
| + params['AdditionalInfo'] = additional_info
|
| +
|
| + response = self.get_object(
|
| + 'RunJobFlow', params, RunJobFlowResponse, verb='POST')
|
| return response.jobflowid
|
|
|
| + def set_termination_protection(self, jobflow_id, termination_protection_status):
|
| + """
|
| + Set termination protection on specified Elastic MapReduce job flows
|
| +
|
| + :type jobflow_ids: list or str
|
| + :param jobflow_ids: A list of job flow IDs
|
| + :type termination_protection_status: bool
|
| + :param termination_protection_status: Termination protection status
|
| + """
|
| + assert termination_protection_status in (True, False)
|
| +
|
| + params = {}
|
| + params['TerminationProtected'] = (termination_protection_status and "true") or "false"
|
| + self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
|
| +
|
| + return self.get_status('SetTerminationProtection', params, verb='POST')
|
| +
|
| +
|
| def _build_bootstrap_action_args(self, bootstrap_action):
|
| bootstrap_action_params = {}
|
| bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path
|
| @@ -248,7 +347,7 @@ class EmrConnection(AWSQueryConnection):
|
| params = {}
|
| for i, bootstrap_action in enumerate(bootstrap_actions):
|
| for key, value in bootstrap_action.iteritems():
|
| - params['BootstrapActions.memeber.%s.%s' % (i + 1, key)] = value
|
| + params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
|
| return params
|
|
|
| def _build_step_list(self, steps):
|
| @@ -258,15 +357,17 @@ class EmrConnection(AWSQueryConnection):
|
| params = {}
|
| for i, step in enumerate(steps):
|
| for key, value in step.iteritems():
|
| - params['Steps.memeber.%s.%s' % (i+1, key)] = value
|
| + params['Steps.member.%s.%s' % (i+1, key)] = value
|
| return params
|
|
|
| - def _build_instance_args(self, ec2_keyname, availability_zone, master_instance_type,
|
| - slave_instance_type, num_instances, keep_alive, hadoop_version):
|
| + def _build_instance_common_args(self, ec2_keyname, availability_zone,
|
| + keep_alive, hadoop_version):
|
| + """
|
| + Takes a number of parameters used when starting a jobflow (as
|
| + specified in run_jobflow() above). Returns a comparable dict for
|
| + use in making a RunJobFlow request.
|
| + """
|
| params = {
|
| - 'Instances.MasterInstanceType' : master_instance_type,
|
| - 'Instances.SlaveInstanceType' : slave_instance_type,
|
| - 'Instances.InstanceCount' : num_instances,
|
| 'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(),
|
| 'Instances.HadoopVersion' : hadoop_version
|
| }
|
| @@ -274,7 +375,53 @@ class EmrConnection(AWSQueryConnection):
|
| if ec2_keyname:
|
| params['Instances.Ec2KeyName'] = ec2_keyname
|
| if availability_zone:
|
| - params['Placement'] = availability_zone
|
| + params['Instances.Placement.AvailabilityZone'] = availability_zone
|
| +
|
| + return params
|
| +
|
| + def _build_instance_count_and_type_args(self, master_instance_type,
|
| + slave_instance_type, num_instances):
|
| + """
|
| + Takes a master instance type (string), a slave instance type
|
| + (string), and a number of instances. Returns a comparable dict
|
| + for use in making a RunJobFlow request.
|
| + """
|
| + params = {
|
| + 'Instances.MasterInstanceType' : master_instance_type,
|
| + 'Instances.SlaveInstanceType' : slave_instance_type,
|
| + 'Instances.InstanceCount' : num_instances,
|
| + }
|
| + return params
|
|
|
| + def _build_instance_group_args(self, instance_group):
|
| + """
|
| + Takes an InstanceGroup; returns a dict that, when its keys are
|
| + properly prefixed, can be used for describing InstanceGroups in
|
| + RunJobFlow or AddInstanceGroups requests.
|
| + """
|
| + params = {
|
| + 'InstanceCount' : instance_group.num_instances,
|
| + 'InstanceRole' : instance_group.role,
|
| + 'InstanceType' : instance_group.type,
|
| + 'Name' : instance_group.name,
|
| + 'Market' : instance_group.market
|
| + }
|
| + if instance_group.market == 'SPOT':
|
| + params['BidPrice'] = instance_group.bidprice
|
| return params
|
|
|
| + def _build_instance_group_list_args(self, instance_groups):
|
| + """
|
| + Takes a list of InstanceGroups, or a single InstanceGroup. Returns
|
| + a comparable dict for use in making a RunJobFlow or AddInstanceGroups
|
| + request.
|
| + """
|
| + if type(instance_groups) != types.ListType:
|
| + instance_groups = [instance_groups]
|
| +
|
| + params = {}
|
| + for i, instance_group in enumerate(instance_groups):
|
| + ig_dict = self._build_instance_group_args(instance_group)
|
| + for key, value in ig_dict.iteritems():
|
| + params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
|
| + return params
|
|
|