| Index: boto/emr/step.py
|
| diff --git a/boto/emr/step.py b/boto/emr/step.py
|
| index a444261b1aa06a2866b2fe468b9e871c279ee78c..15dfe8897f8dd1591568b16f7c175ba722a29fb4 100644
|
| --- a/boto/emr/step.py
|
| +++ b/boto/emr/step.py
|
| @@ -1,4 +1,5 @@
|
| # Copyright (c) 2010 Spotify AB
|
| +# Copyright (c) 2010-2011 Yelp
|
| #
|
| # Permission is hereby granted, free of charge, to any person obtaining a
|
| # copy of this software and associated documentation files (the
|
| @@ -94,10 +95,11 @@ class StreamingStep(Step):
|
| """
|
| Hadoop streaming step
|
| """
|
| - def __init__(self, name, mapper, reducer=None,
|
| + def __init__(self, name, mapper, reducer=None, combiner=None,
|
| action_on_failure='TERMINATE_JOB_FLOW',
|
| cache_files=None, cache_archives=None,
|
| - step_args=None, input=None, output=None):
|
| + step_args=None, input=None, output=None,
|
| + jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
|
| """
|
| A hadoop streaming elastic mapreduce step
|
|
|
| @@ -107,6 +109,8 @@ class StreamingStep(Step):
|
| :param mapper: The mapper URI
|
| :type reducer: str
|
| :param reducer: The reducer URI
|
| + :type combiner: str
|
| + :param combiner: The combiner URI. Only works for Hadoop 0.20 and later!
|
| :type action_on_failure: str
|
| :param action_on_failure: An action, defined in the EMR docs to take on failure.
|
| :type cache_files: list(str)
|
| @@ -119,15 +123,19 @@ class StreamingStep(Step):
|
| :param input: The input uri
|
| :type output: str
|
| :param output: The output uri
|
| + :type jar: str
|
| + :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
|
| """
|
| self.name = name
|
| self.mapper = mapper
|
| self.reducer = reducer
|
| + self.combiner = combiner
|
| self.action_on_failure = action_on_failure
|
| self.cache_files = cache_files
|
| self.cache_archives = cache_archives
|
| self.input = input
|
| self.output = output
|
| + self._jar = jar
|
|
|
| if isinstance(step_args, basestring):
|
| step_args = [step_args]
|
| @@ -135,16 +143,28 @@ class StreamingStep(Step):
|
| self.step_args = step_args
|
|
|
| def jar(self):
|
| - return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar'
|
| + return self._jar
|
|
|
| def main_class(self):
|
| return None
|
|
|
| def args(self):
|
| - args = ['-mapper', self.mapper]
|
| + args = []
|
| +
|
| + # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
|
| + # will work
|
| + if self.step_args:
|
| + args.extend(self.step_args)
|
| +
|
| + args.extend(['-mapper', self.mapper])
|
| +
|
| + if self.combiner:
|
| + args.extend(['-combiner', self.combiner])
|
|
|
| if self.reducer:
|
| args.extend(['-reducer', self.reducer])
|
| + else:
|
| + args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
|
|
|
| if self.input:
|
| if isinstance(self.input, list):
|
| @@ -163,17 +183,11 @@ class StreamingStep(Step):
|
| for cache_archive in self.cache_archives:
|
| args.extend(('-cacheArchive', cache_archive))
|
|
|
| - if self.step_args:
|
| - args.extend(self.step_args)
|
| -
|
| - if not self.reducer:
|
| - args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
|
| -
|
| return args
|
|
|
| def __repr__(self):
|
| - return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r)' % (
|
| + return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
|
| self.__class__.__module__, self.__class__.__name__,
|
| self.name, self.mapper, self.reducer, self.action_on_failure,
|
| self.cache_files, self.cache_archives, self.step_args,
|
| - self.input, self.output)
|
| + self.input, self.output, self._jar)
|
|
|