OLD | NEW |
1 # Copyright (c) 2010 Spotify AB | 1 # Copyright (c) 2010 Spotify AB |
| 2 # Copyright (c) 2010-2011 Yelp |
2 # | 3 # |
3 # Permission is hereby granted, free of charge, to any person obtaining a | 4 # Permission is hereby granted, free of charge, to any person obtaining a |
4 # copy of this software and associated documentation files (the | 5 # copy of this software and associated documentation files (the |
5 # "Software"), to deal in the Software without restriction, including | 6 # "Software"), to deal in the Software without restriction, including |
6 # without limitation the rights to use, copy, modify, merge, publish, dis- | 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
7 # tribute, sublicense, and/or sell copies of the Software, and to permit | 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
8 # persons to whom the Software is furnished to do so, subject to the fol- | 9 # persons to whom the Software is furnished to do so, subject to the fol- |
9 # lowing conditions: | 10 # lowing conditions: |
10 # | 11 # |
11 # The above copyright notice and this permission notice shall be included | 12 # The above copyright notice and this permission notice shall be included |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
87 return args | 88 return args |
88 | 89 |
89 def main_class(self): | 90 def main_class(self): |
90 return self._main_class | 91 return self._main_class |
91 | 92 |
92 | 93 |
93 class StreamingStep(Step): | 94 class StreamingStep(Step): |
94 """ | 95 """ |
95 Hadoop streaming step | 96 Hadoop streaming step |
96 """ | 97 """ |
97 def __init__(self, name, mapper, reducer=None, | 98 def __init__(self, name, mapper, reducer=None, combiner=None, |
98 action_on_failure='TERMINATE_JOB_FLOW', | 99 action_on_failure='TERMINATE_JOB_FLOW', |
99 cache_files=None, cache_archives=None, | 100 cache_files=None, cache_archives=None, |
100 step_args=None, input=None, output=None): | 101 step_args=None, input=None, output=None, |
| 102 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): |
101 """ | 103 """ |
102 A hadoop streaming elastic mapreduce step | 104 A hadoop streaming elastic mapreduce step |
103 | 105 |
104 :type name: str | 106 :type name: str |
105 :param name: The name of the step | 107 :param name: The name of the step |
106 :type mapper: str | 108 :type mapper: str |
107 :param mapper: The mapper URI | 109 :param mapper: The mapper URI |
108 :type reducer: str | 110 :type reducer: str |
109 :param reducer: The reducer URI | 111 :param reducer: The reducer URI |
| 112 :type combiner: str |
| 113 :param combiner: The combiner URI. Only works for Hadoop 0.20 and later! |
110 :type action_on_failure: str | 114 :type action_on_failure: str |
111 :param action_on_failure: An action, defined in the EMR docs to take on
failure. | 115 :param action_on_failure: An action, defined in the EMR docs to take on
failure. |
112 :type cache_files: list(str) | 116 :type cache_files: list(str) |
113 :param cache_files: A list of cache files to be bundled with the job | 117 :param cache_files: A list of cache files to be bundled with the job |
114 :type cache_archives: list(str) | 118 :type cache_archives: list(str) |
115 :param cache_archives: A list of jar archives to be bundled with the job | 119 :param cache_archives: A list of jar archives to be bundled with the job |
116 :type step_args: list(str) | 120 :type step_args: list(str) |
117 :param step_args: A list of arguments to pass to the step | 121 :param step_args: A list of arguments to pass to the step |
118 :type input: str or a list of str | 122 :type input: str or a list of str |
119 :param input: The input uri | 123 :param input: The input uri |
120 :type output: str | 124 :type output: str |
121 :param output: The output uri | 125 :param output: The output uri |
| 126 :type jar: str |
| 127 :param jar: The hadoop streaming jar. This can be either a local path on
the master node, or an s3:// URI. |
122 """ | 128 """ |
123 self.name = name | 129 self.name = name |
124 self.mapper = mapper | 130 self.mapper = mapper |
125 self.reducer = reducer | 131 self.reducer = reducer |
| 132 self.combiner = combiner |
126 self.action_on_failure = action_on_failure | 133 self.action_on_failure = action_on_failure |
127 self.cache_files = cache_files | 134 self.cache_files = cache_files |
128 self.cache_archives = cache_archives | 135 self.cache_archives = cache_archives |
129 self.input = input | 136 self.input = input |
130 self.output = output | 137 self.output = output |
| 138 self._jar = jar |
131 | 139 |
132 if isinstance(step_args, basestring): | 140 if isinstance(step_args, basestring): |
133 step_args = [step_args] | 141 step_args = [step_args] |
134 | 142 |
135 self.step_args = step_args | 143 self.step_args = step_args |
136 | 144 |
137 def jar(self): | 145 def jar(self): |
138 return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar' | 146 return self._jar |
139 | 147 |
140 def main_class(self): | 148 def main_class(self): |
141 return None | 149 return None |
142 | 150 |
143 def args(self): | 151 def args(self): |
144 args = ['-mapper', self.mapper] | 152 args = [] |
| 153 |
| 154 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar |
| 155 # will work |
| 156 if self.step_args: |
| 157 args.extend(self.step_args) |
| 158 |
| 159 args.extend(['-mapper', self.mapper]) |
| 160 |
| 161 if self.combiner: |
| 162 args.extend(['-combiner', self.combiner]) |
145 | 163 |
146 if self.reducer: | 164 if self.reducer: |
147 args.extend(['-reducer', self.reducer]) | 165 args.extend(['-reducer', self.reducer]) |
| 166 else: |
| 167 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) |
148 | 168 |
149 if self.input: | 169 if self.input: |
150 if isinstance(self.input, list): | 170 if isinstance(self.input, list): |
151 for input in self.input: | 171 for input in self.input: |
152 args.extend(('-input', input)) | 172 args.extend(('-input', input)) |
153 else: | 173 else: |
154 args.extend(('-input', self.input)) | 174 args.extend(('-input', self.input)) |
155 if self.output: | 175 if self.output: |
156 args.extend(('-output', self.output)) | 176 args.extend(('-output', self.output)) |
157 | 177 |
158 if self.cache_files: | 178 if self.cache_files: |
159 for cache_file in self.cache_files: | 179 for cache_file in self.cache_files: |
160 args.extend(('-cacheFile', cache_file)) | 180 args.extend(('-cacheFile', cache_file)) |
161 | 181 |
162 if self.cache_archives: | 182 if self.cache_archives: |
163 for cache_archive in self.cache_archives: | 183 for cache_archive in self.cache_archives: |
164 args.extend(('-cacheArchive', cache_archive)) | 184 args.extend(('-cacheArchive', cache_archive)) |
165 | 185 |
166 if self.step_args: | |
167 args.extend(self.step_args) | |
168 | |
169 if not self.reducer: | |
170 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) | |
171 | |
172 return args | 186 return args |
173 | 187 |
174 def __repr__(self): | 188 def __repr__(self): |
175 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach
e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r)' % ( | 189 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach
e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( |
176 self.__class__.__module__, self.__class__.__name__, | 190 self.__class__.__module__, self.__class__.__name__, |
177 self.name, self.mapper, self.reducer, self.action_on_failure, | 191 self.name, self.mapper, self.reducer, self.action_on_failure, |
178 self.cache_files, self.cache_archives, self.step_args, | 192 self.cache_files, self.cache_archives, self.step_args, |
179 self.input, self.output) | 193 self.input, self.output, self._jar) |
OLD | NEW |