Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3638)

Unified Diff: boto/emr/step.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « boto/emr/instance_group.py ('k') | boto/exception.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: boto/emr/step.py
diff --git a/boto/emr/step.py b/boto/emr/step.py
index a444261b1aa06a2866b2fe468b9e871c279ee78c..15dfe8897f8dd1591568b16f7c175ba722a29fb4 100644
--- a/boto/emr/step.py
+++ b/boto/emr/step.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010 Spotify AB
+# Copyright (c) 2010-2011 Yelp
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
@@ -94,10 +95,11 @@ class StreamingStep(Step):
"""
Hadoop streaming step
"""
- def __init__(self, name, mapper, reducer=None,
+ def __init__(self, name, mapper, reducer=None, combiner=None,
action_on_failure='TERMINATE_JOB_FLOW',
cache_files=None, cache_archives=None,
- step_args=None, input=None, output=None):
+ step_args=None, input=None, output=None,
+ jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
"""
A hadoop streaming elastic mapreduce step
@@ -107,6 +109,8 @@ class StreamingStep(Step):
:param mapper: The mapper URI
:type reducer: str
:param reducer: The reducer URI
+ :type combiner: str
+ :param combiner: The combiner URI. Only works for Hadoop 0.20 and later!
:type action_on_failure: str
:param action_on_failure: An action, defined in the EMR docs to take on failure.
:type cache_files: list(str)
@@ -119,15 +123,19 @@ class StreamingStep(Step):
:param input: The input uri
:type output: str
:param output: The output uri
+ :type jar: str
+ :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
"""
self.name = name
self.mapper = mapper
self.reducer = reducer
+ self.combiner = combiner
self.action_on_failure = action_on_failure
self.cache_files = cache_files
self.cache_archives = cache_archives
self.input = input
self.output = output
+ self._jar = jar
if isinstance(step_args, basestring):
step_args = [step_args]
@@ -135,16 +143,28 @@ class StreamingStep(Step):
self.step_args = step_args
def jar(self):
- return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar'
+ return self._jar
def main_class(self):
return None
def args(self):
- args = ['-mapper', self.mapper]
+ args = []
+
+ # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
+ # will work
+ if self.step_args:
+ args.extend(self.step_args)
+
+ args.extend(['-mapper', self.mapper])
+
+ if self.combiner:
+ args.extend(['-combiner', self.combiner])
if self.reducer:
args.extend(['-reducer', self.reducer])
+ else:
+ args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
if self.input:
if isinstance(self.input, list):
@@ -163,17 +183,11 @@ class StreamingStep(Step):
for cache_archive in self.cache_archives:
args.extend(('-cacheArchive', cache_archive))
- if self.step_args:
- args.extend(self.step_args)
-
- if not self.reducer:
- args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
-
return args
def __repr__(self):
- return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r)' % (
+ return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
self.__class__.__module__, self.__class__.__name__,
self.name, self.mapper, self.reducer, self.action_on_failure,
self.cache_files, self.cache_archives, self.step_args,
- self.input, self.output)
+ self.input, self.output, self._jar)
« no previous file with comments | « boto/emr/instance_group.py ('k') | boto/exception.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698