| OLD | NEW |
| 1 # Copyright (c) 2010 Spotify AB | 1 # Copyright (c) 2010 Spotify AB |
| 2 # Copyright (c) 2010-2011 Yelp |
| 2 # | 3 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a | 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the | 5 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including | 6 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- | 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: | 10 # lowing conditions: |
| 10 # | 11 # |
| 11 # The above copyright notice and this permission notice shall be included | 12 # The above copyright notice and this permission notice shall be included |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 87 return args | 88 return args |
| 88 | 89 |
| 89 def main_class(self): | 90 def main_class(self): |
| 90 return self._main_class | 91 return self._main_class |
| 91 | 92 |
| 92 | 93 |
| 93 class StreamingStep(Step): | 94 class StreamingStep(Step): |
| 94 """ | 95 """ |
| 95 Hadoop streaming step | 96 Hadoop streaming step |
| 96 """ | 97 """ |
| 97 def __init__(self, name, mapper, reducer=None, | 98 def __init__(self, name, mapper, reducer=None, combiner=None, |
| 98 action_on_failure='TERMINATE_JOB_FLOW', | 99 action_on_failure='TERMINATE_JOB_FLOW', |
| 99 cache_files=None, cache_archives=None, | 100 cache_files=None, cache_archives=None, |
| 100 step_args=None, input=None, output=None): | 101 step_args=None, input=None, output=None, |
| 102 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): |
| 101 """ | 103 """ |
| 102 A hadoop streaming elastic mapreduce step | 104 A hadoop streaming elastic mapreduce step |
| 103 | 105 |
| 104 :type name: str | 106 :type name: str |
| 105 :param name: The name of the step | 107 :param name: The name of the step |
| 106 :type mapper: str | 108 :type mapper: str |
| 107 :param mapper: The mapper URI | 109 :param mapper: The mapper URI |
| 108 :type reducer: str | 110 :type reducer: str |
| 109 :param reducer: The reducer URI | 111 :param reducer: The reducer URI |
| 112 :type combiner: str |
| 113 :param combiner: The combiner URI. Only works for Hadoop 0.20 and later! |
| 110 :type action_on_failure: str | 114 :type action_on_failure: str |
| 111 :param action_on_failure: An action, defined in the EMR docs to take on
failure. | 115 :param action_on_failure: An action, defined in the EMR docs to take on
failure. |
| 112 :type cache_files: list(str) | 116 :type cache_files: list(str) |
| 113 :param cache_files: A list of cache files to be bundled with the job | 117 :param cache_files: A list of cache files to be bundled with the job |
| 114 :type cache_archives: list(str) | 118 :type cache_archives: list(str) |
| 115 :param cache_archives: A list of jar archives to be bundled with the job | 119 :param cache_archives: A list of jar archives to be bundled with the job |
| 116 :type step_args: list(str) | 120 :type step_args: list(str) |
| 117 :param step_args: A list of arguments to pass to the step | 121 :param step_args: A list of arguments to pass to the step |
| 118 :type input: str or a list of str | 122 :type input: str or a list of str |
| 119 :param input: The input uri | 123 :param input: The input uri |
| 120 :type output: str | 124 :type output: str |
| 121 :param output: The output uri | 125 :param output: The output uri |
| 126 :type jar: str |
| 127 :param jar: The hadoop streaming jar. This can be either a local path on
the master node, or an s3:// URI. |
| 122 """ | 128 """ |
| 123 self.name = name | 129 self.name = name |
| 124 self.mapper = mapper | 130 self.mapper = mapper |
| 125 self.reducer = reducer | 131 self.reducer = reducer |
| 132 self.combiner = combiner |
| 126 self.action_on_failure = action_on_failure | 133 self.action_on_failure = action_on_failure |
| 127 self.cache_files = cache_files | 134 self.cache_files = cache_files |
| 128 self.cache_archives = cache_archives | 135 self.cache_archives = cache_archives |
| 129 self.input = input | 136 self.input = input |
| 130 self.output = output | 137 self.output = output |
| 138 self._jar = jar |
| 131 | 139 |
| 132 if isinstance(step_args, basestring): | 140 if isinstance(step_args, basestring): |
| 133 step_args = [step_args] | 141 step_args = [step_args] |
| 134 | 142 |
| 135 self.step_args = step_args | 143 self.step_args = step_args |
| 136 | 144 |
| 137 def jar(self): | 145 def jar(self): |
| 138 return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar' | 146 return self._jar |
| 139 | 147 |
| 140 def main_class(self): | 148 def main_class(self): |
| 141 return None | 149 return None |
| 142 | 150 |
| 143 def args(self): | 151 def args(self): |
| 144 args = ['-mapper', self.mapper] | 152 args = [] |
| 153 |
| 154 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar |
| 155 # will work |
| 156 if self.step_args: |
| 157 args.extend(self.step_args) |
| 158 |
| 159 args.extend(['-mapper', self.mapper]) |
| 160 |
| 161 if self.combiner: |
| 162 args.extend(['-combiner', self.combiner]) |
| 145 | 163 |
| 146 if self.reducer: | 164 if self.reducer: |
| 147 args.extend(['-reducer', self.reducer]) | 165 args.extend(['-reducer', self.reducer]) |
| 166 else: |
| 167 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) |
| 148 | 168 |
| 149 if self.input: | 169 if self.input: |
| 150 if isinstance(self.input, list): | 170 if isinstance(self.input, list): |
| 151 for input in self.input: | 171 for input in self.input: |
| 152 args.extend(('-input', input)) | 172 args.extend(('-input', input)) |
| 153 else: | 173 else: |
| 154 args.extend(('-input', self.input)) | 174 args.extend(('-input', self.input)) |
| 155 if self.output: | 175 if self.output: |
| 156 args.extend(('-output', self.output)) | 176 args.extend(('-output', self.output)) |
| 157 | 177 |
| 158 if self.cache_files: | 178 if self.cache_files: |
| 159 for cache_file in self.cache_files: | 179 for cache_file in self.cache_files: |
| 160 args.extend(('-cacheFile', cache_file)) | 180 args.extend(('-cacheFile', cache_file)) |
| 161 | 181 |
| 162 if self.cache_archives: | 182 if self.cache_archives: |
| 163 for cache_archive in self.cache_archives: | 183 for cache_archive in self.cache_archives: |
| 164 args.extend(('-cacheArchive', cache_archive)) | 184 args.extend(('-cacheArchive', cache_archive)) |
| 165 | 185 |
| 166 if self.step_args: | |
| 167 args.extend(self.step_args) | |
| 168 | |
| 169 if not self.reducer: | |
| 170 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) | |
| 171 | |
| 172 return args | 186 return args |
| 173 | 187 |
| 174 def __repr__(self): | 188 def __repr__(self): |
| 175 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach
e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r)' % ( | 189 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cach
e_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( |
| 176 self.__class__.__module__, self.__class__.__name__, | 190 self.__class__.__module__, self.__class__.__name__, |
| 177 self.name, self.mapper, self.reducer, self.action_on_failure, | 191 self.name, self.mapper, self.reducer, self.action_on_failure, |
| 178 self.cache_files, self.cache_archives, self.step_args, | 192 self.cache_files, self.cache_archives, self.step_args, |
| 179 self.input, self.output) | 193 self.input, self.output, self._jar) |
| OLD | NEW |