Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(385)

Side by Side Diff: boto/emr/step.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « boto/emr/instance_group.py ('k') | boto/exception.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright (c) 2010 Spotify AB 1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
2 # 3 #
3 # Permission is hereby granted, free of charge, to any person obtaining a 4 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the 5 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including 6 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis- 7 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit 8 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol- 9 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions: 10 # lowing conditions:
10 # 11 #
11 # The above copyright notice and this permission notice shall be included 12 # The above copyright notice and this permission notice shall be included
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 return args 88 return args
88 89
89 def main_class(self): 90 def main_class(self):
90 return self._main_class 91 return self._main_class
91 92
92 93
93 class StreamingStep(Step): 94 class StreamingStep(Step):
94 """ 95 """
95 Hadoop streaming step 96 Hadoop streaming step
96 """ 97 """
97 def __init__(self, name, mapper, reducer=None, 98 def __init__(self, name, mapper, reducer=None, combiner=None,
98 action_on_failure='TERMINATE_JOB_FLOW', 99 action_on_failure='TERMINATE_JOB_FLOW',
99 cache_files=None, cache_archives=None, 100 cache_files=None, cache_archives=None,
100 step_args=None, input=None, output=None): 101 step_args=None, input=None, output=None,
102 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
101 """ 103 """
102 A hadoop streaming elastic mapreduce step 104 A hadoop streaming elastic mapreduce step
103 105
104 :type name: str 106 :type name: str
105 :param name: The name of the step 107 :param name: The name of the step
106 :type mapper: str 108 :type mapper: str
107 :param mapper: The mapper URI 109 :param mapper: The mapper URI
108 :type reducer: str 110 :type reducer: str
109 :param reducer: The reducer URI 111 :param reducer: The reducer URI
112 :type combiner: str
113 :param combiner: The combiner URI. Only works for Hadoop 0.20 and later!
110 :type action_on_failure: str 114 :type action_on_failure: str
111 :param action_on_failure: An action, defined in the EMR docs to take on failure. 115 :param action_on_failure: An action, defined in the EMR docs to take on failure.
112 :type cache_files: list(str) 116 :type cache_files: list(str)
113 :param cache_files: A list of cache files to be bundled with the job 117 :param cache_files: A list of cache files to be bundled with the job
114 :type cache_archives: list(str) 118 :type cache_archives: list(str)
115 :param cache_archives: A list of jar archives to be bundled with the job 119 :param cache_archives: A list of jar archives to be bundled with the job
116 :type step_args: list(str) 120 :type step_args: list(str)
117 :param step_args: A list of arguments to pass to the step 121 :param step_args: A list of arguments to pass to the step
118 :type input: str or a list of str 122 :type input: str or a list of str
119 :param input: The input uri 123 :param input: The input uri
120 :type output: str 124 :type output: str
121 :param output: The output uri 125 :param output: The output uri
126 :type jar: str
127 :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
122 """ 128 """
123 self.name = name 129 self.name = name
124 self.mapper = mapper 130 self.mapper = mapper
125 self.reducer = reducer 131 self.reducer = reducer
132 self.combiner = combiner
126 self.action_on_failure = action_on_failure 133 self.action_on_failure = action_on_failure
127 self.cache_files = cache_files 134 self.cache_files = cache_files
128 self.cache_archives = cache_archives 135 self.cache_archives = cache_archives
129 self.input = input 136 self.input = input
130 self.output = output 137 self.output = output
138 self._jar = jar
131 139
132 if isinstance(step_args, basestring): 140 if isinstance(step_args, basestring):
133 step_args = [step_args] 141 step_args = [step_args]
134 142
135 self.step_args = step_args 143 self.step_args = step_args
136 144
137 def jar(self): 145 def jar(self):
138 return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar' 146 return self._jar
139 147
140 def main_class(self): 148 def main_class(self):
141 return None 149 return None
142 150
143 def args(self): 151 def args(self):
144 args = ['-mapper', self.mapper] 152 args = []
153
154 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
155 # will work
156 if self.step_args:
157 args.extend(self.step_args)
158
159 args.extend(['-mapper', self.mapper])
160
161 if self.combiner:
162 args.extend(['-combiner', self.combiner])
145 163
146 if self.reducer: 164 if self.reducer:
147 args.extend(['-reducer', self.reducer]) 165 args.extend(['-reducer', self.reducer])
166 else:
167 args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
148 168
149 if self.input: 169 if self.input:
150 if isinstance(self.input, list): 170 if isinstance(self.input, list):
151 for input in self.input: 171 for input in self.input:
152 args.extend(('-input', input)) 172 args.extend(('-input', input))
153 else: 173 else:
154 args.extend(('-input', self.input)) 174 args.extend(('-input', self.input))
155 if self.output: 175 if self.output:
156 args.extend(('-output', self.output)) 176 args.extend(('-output', self.output))
157 177
158 if self.cache_files: 178 if self.cache_files:
159 for cache_file in self.cache_files: 179 for cache_file in self.cache_files:
160 args.extend(('-cacheFile', cache_file)) 180 args.extend(('-cacheFile', cache_file))
161 181
162 if self.cache_archives: 182 if self.cache_archives:
163 for cache_archive in self.cache_archives: 183 for cache_archive in self.cache_archives:
164 args.extend(('-cacheArchive', cache_archive)) 184 args.extend(('-cacheArchive', cache_archive))
165 185
166 if self.step_args:
167 args.extend(self.step_args)
168
169 if not self.reducer:
170 args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
171
172 return args 186 return args
173 187
174 def __repr__(self): 188 def __repr__(self):
175 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r)' % ( 189 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
176 self.__class__.__module__, self.__class__.__name__, 190 self.__class__.__module__, self.__class__.__name__,
177 self.name, self.mapper, self.reducer, self.action_on_failure, 191 self.name, self.mapper, self.reducer, self.action_on_failure,
178 self.cache_files, self.cache_archives, self.step_args, 192 self.cache_files, self.cache_archives, self.step_args,
179 self.input, self.output) 193 self.input, self.output, self._jar)
OLDNEW
« no previous file with comments | « boto/emr/instance_group.py ('k') | boto/exception.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698