Index: boto/emr/step.py |
diff --git a/boto/emr/step.py b/boto/emr/step.py |
index a444261b1aa06a2866b2fe468b9e871c279ee78c..15dfe8897f8dd1591568b16f7c175ba722a29fb4 100644 |
--- a/boto/emr/step.py |
+++ b/boto/emr/step.py |
@@ -1,4 +1,5 @@ |
# Copyright (c) 2010 Spotify AB |
+# Copyright (c) 2010-2011 Yelp |
# |
# Permission is hereby granted, free of charge, to any person obtaining a |
# copy of this software and associated documentation files (the |
@@ -94,10 +95,11 @@ class StreamingStep(Step): |
""" |
Hadoop streaming step |
""" |
- def __init__(self, name, mapper, reducer=None, |
+ def __init__(self, name, mapper, reducer=None, combiner=None, |
action_on_failure='TERMINATE_JOB_FLOW', |
cache_files=None, cache_archives=None, |
- step_args=None, input=None, output=None): |
+ step_args=None, input=None, output=None, |
+ jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): |
""" |
A hadoop streaming elastic mapreduce step |
@@ -107,6 +109,8 @@ class StreamingStep(Step): |
:param mapper: The mapper URI |
:type reducer: str |
:param reducer: The reducer URI |
+ :type combiner: str |
+ :param combiner: The combiner URI. Only works for Hadoop 0.20 and later! |
:type action_on_failure: str |
:param action_on_failure: An action, defined in the EMR docs to take on failure. |
:type cache_files: list(str) |
@@ -119,15 +123,19 @@ class StreamingStep(Step): |
:param input: The input uri |
:type output: str |
:param output: The output uri |
+ :type jar: str |
+ :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI. |
""" |
self.name = name |
self.mapper = mapper |
self.reducer = reducer |
+ self.combiner = combiner |
self.action_on_failure = action_on_failure |
self.cache_files = cache_files |
self.cache_archives = cache_archives |
self.input = input |
self.output = output |
+ self._jar = jar |
if isinstance(step_args, basestring): |
step_args = [step_args] |
@@ -135,16 +143,28 @@ class StreamingStep(Step): |
self.step_args = step_args |
def jar(self): |
- return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar' |
+ return self._jar |
def main_class(self): |
return None |
def args(self): |
- args = ['-mapper', self.mapper] |
+ args = [] |
+ |
+ # put extra args BEFORE -mapper and -reducer so that e.g. -libjar |
+ # will work |
+ if self.step_args: |
+ args.extend(self.step_args) |
+ |
+ args.extend(['-mapper', self.mapper]) |
+ |
+ if self.combiner: |
+ args.extend(['-combiner', self.combiner]) |
if self.reducer: |
args.extend(['-reducer', self.reducer]) |
+ else: |
+ args.extend(['-jobconf', 'mapred.reduce.tasks=0']) |
if self.input: |
if isinstance(self.input, list): |
@@ -163,17 +183,11 @@ class StreamingStep(Step): |
for cache_archive in self.cache_archives: |
args.extend(('-cacheArchive', cache_archive)) |
- if self.step_args: |
- args.extend(self.step_args) |
- |
- if not self.reducer: |
- args.extend(['-jobconf', 'mapred.reduce.tasks=0']) |
- |
return args |
def __repr__(self): |
- return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r)' % ( |
+ return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( |
self.__class__.__module__, self.__class__.__name__, |
self.name, self.mapper, self.reducer, self.action_on_failure, |
self.cache_files, self.cache_archives, self.step_args, |
- self.input, self.output) |
+ self.input, self.output, self._jar) |