Chromium Code Reviews| Index: build/android/pylib/utils/json_results_generator.py |
| diff --git a/build/android/pylib/utils/json_results_generator.py b/build/android/pylib/utils/json_results_generator.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..cd3944674b70f953abe849aa1418639e78b581c0 |
| --- /dev/null |
| +++ b/build/android/pylib/utils/json_results_generator.py |
| @@ -0,0 +1,686 @@ |
| +# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +# |
| +# Most of this file was ported over from Blink's |
| +# Tools/Scripts/webkitpy/layout_tests/layout_package/json_results_generator.py |
| +# Tools/Scripts/webkitpy/common/net/file_uploader.py |
| +# |
| + |
| +import json |
| +import logging |
| +import mimetypes |
| +import os |
| +import subprocess |
| +import sys |
| +import time |
| +import urllib2 |
| +import xml.dom.minidom |
| + |
| +_log = logging.getLogger(__name__) |
| + |
| + |
| +def write_json(json_object, file_path, callback=None): |
| + # Specify separators in order to get compact encoding. |
|
bulach
2014/06/20 01:42:36
not sure how much should we care about chromi-fy t
|
| + json_string = json.dumps(json_object, separators=(',', ':')) |
| + if callback: |
| + json_string = callback + "(" + json_string + ");" |
| + with open(file_path, 'w') as fp: |
| + fp.write(json_string) |
| + |
| + |
| +def convert_trie_to_flat_paths(trie, prefix=None): |
| + """Converts the directory structure in the given trie to flat paths, prepending a prefix to each.""" |
| + result = {} |
| + for name, data in trie.iteritems(): |
| + if prefix: |
| + name = prefix + "/" + name |
| + |
| + if len(data) and not "results" in data: |
| + result.update(convert_trie_to_flat_paths(data, name)) |
| + else: |
| + result[name] = data |
| + |
| + return result |
| + |
| + |
| +def add_path_to_trie(path, value, trie): |
| + """Inserts a single flat directory path and associated value into a directory trie structure.""" |
| + if not "/" in path: |
| + trie[path] = value |
| + return |
| + |
| + directory, slash, rest = path.partition("/") |
| + if not directory in trie: |
| + trie[directory] = {} |
| + add_path_to_trie(rest, value, trie[directory]) |
| + |
| +def test_timings_trie(individual_test_timings): |
| + """Breaks a test name into chunks by directory and puts the test time as a value in the lowest part, e.g. |
| + foo/bar/baz.html: 1ms |
| + foo/bar/baz1.html: 3ms |
| + |
| + becomes |
| + foo: { |
| + bar: { |
| + baz.html: 1, |
| + baz1.html: 3 |
| + } |
| + } |
| + """ |
| + trie = {} |
| + for test_result in individual_test_timings: |
| + test = test_result.test_name |
| + |
| + add_path_to_trie(test, int(1000 * test_result.test_run_time), trie) |
| + |
| + return trie |
| + |
| + |
| +class TestResult(object): |
| + """A simple class that represents a single test result.""" |
| + |
| + # Test modifier constants. |
| + (NONE, FAILS, FLAKY, DISABLED) = range(4) |
| + |
| + def __init__(self, test, failed=False, elapsed_time=0): |
| + self.test_name = test |
| + self.failed = failed |
| + self.test_run_time = elapsed_time |
| + |
| + test_name = test |
| + try: |
| + test_name = test.split('.')[1] |
| + except IndexError: |
| + _log.warn("Invalid test name: %s.", test) |
| + pass |
| + |
| + if test_name.startswith('FAILS_'): |
| + self.modifier = self.FAILS |
| + elif test_name.startswith('FLAKY_'): |
| + self.modifier = self.FLAKY |
| + elif test_name.startswith('DISABLED_'): |
| + self.modifier = self.DISABLED |
| + else: |
| + self.modifier = self.NONE |
| + |
| + def fixable(self): |
| + return self.failed or self.modifier == self.DISABLED |
| + |
| + |
| +class JSONResultsGeneratorBase(object): |
| + """A JSON results generator for generic tests.""" |
| + |
| + MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG = 750 |
| + # Min time (seconds) that will be added to the JSON. |
| + MIN_TIME = 1 |
| + |
| + # Note that in non-chromium tests those chars are used to indicate |
| + # test modifiers (FAILS, FLAKY, etc) but not actual test results. |
| + PASS_RESULT = "P" |
| + SKIP_RESULT = "X" |
| + FAIL_RESULT = "F" |
| + FLAKY_RESULT = "L" |
| + NO_DATA_RESULT = "N" |
| + |
| + MODIFIER_TO_CHAR = {TestResult.NONE: PASS_RESULT, |
| + TestResult.DISABLED: SKIP_RESULT, |
| + TestResult.FAILS: FAIL_RESULT, |
| + TestResult.FLAKY: FLAKY_RESULT} |
| + |
| + VERSION = 4 |
| + VERSION_KEY = "version" |
| + RESULTS = "results" |
| + TIMES = "times" |
| + BUILD_NUMBERS = "buildNumbers" |
| + TIME = "secondsSinceEpoch" |
| + TESTS = "tests" |
| + |
| + FIXABLE_COUNT = "fixableCount" |
| + FIXABLE = "fixableCounts" |
| + ALL_FIXABLE_COUNT = "allFixableCount" |
| + |
| + RESULTS_FILENAME = "results.json" |
| + TIMES_MS_FILENAME = "times_ms.json" |
| + INCREMENTAL_RESULTS_FILENAME = "incremental_results.json" |
| + |
| + URL_FOR_TEST_LIST_JSON = "http://%s/testfile?builder=%s&name=%s&testlistjson=1&testtype=%s&master=%s" |
| + |
| + # FIXME: Remove generate_incremental_results once the reference to it in |
| + # http://src.chromium.org/viewvc/chrome/trunk/tools/build/scripts/slave/gtest_slave_utils.py |
| + # has been removed. |
| + def __init__(self, builder_name, build_name, build_number, |
| + results_file_base_path, builder_base_url, |
| + test_results_map, svn_repositories=None, |
| + test_results_server=None, |
| + test_type="", |
| + master_name="", |
| + generate_incremental_results=None): |
| + """Modifies the results.json file. Grabs it off the archive directory |
| + if it is not found locally. |
| + |
| + Args |
| + builder_name: the builder name (e.g. Webkit). |
| + build_name: the build name (e.g. webkit-rel). |
| + build_number: the build number. |
| + results_file_base_path: Absolute path to the directory containing the |
| + results json file. |
| + builder_base_url: the URL where we have the archived test results. |
| + If this is None no archived results will be retrieved. |
| + test_results_map: A dictionary that maps test_name to TestResult. |
| + svn_repositories: A (json_field_name, svn_path) pair for SVN |
| + repositories that tests rely on. The SVN revision will be |
| + included in the JSON with the given json_field_name. |
| + test_results_server: server that hosts test results json. |
| + test_type: test type string (e.g. 'layout-tests'). |
| + master_name: the name of the buildbot master. |
| + """ |
| + self._builder_name = builder_name |
| + self._build_name = build_name |
| + self._build_number = build_number |
| + self._builder_base_url = builder_base_url |
| + self._results_directory = results_file_base_path |
| + |
| + self._test_results_map = test_results_map |
| + self._test_results = test_results_map.values() |
| + |
| + self._svn_repositories = svn_repositories |
| + if not self._svn_repositories: |
| + self._svn_repositories = {} |
| + |
| + self._test_results_server = test_results_server |
| + self._test_type = test_type |
| + self._master_name = master_name |
| + |
| + self._archived_results = None |
| + |
| + def generate_json_output(self): |
| + json_object = self.get_json() |
| + if json_object: |
| + file_path = os.path.join(self._results_directory, self.INCREMENTAL_RESULTS_FILENAME) |
| + write_json(json_object, file_path) |
| + |
| + def generate_times_ms_file(self): |
| + # FIXME: rename to generate_times_ms_file. This needs to be coordinated with |
| + # changing the calls to this on the chromium build slaves. |
| + times = test_timings_trie(self._test_results_map.values()) |
| + file_path = os.path.join(self._results_directory, self.TIMES_MS_FILENAME) |
| + write_json(times, file_path) |
| + |
| + def get_json(self): |
| + """Gets the results for the results.json file.""" |
| + results_json = {} |
| + |
| + if not results_json: |
| + results_json, error = self._get_archived_json_results() |
| + if error: |
| + # If there was an error don't write a results.json |
| + # file at all as it would lose all the information on the |
| + # bot. |
| + _log.error("Archive directory is inaccessible. Not " |
| + "modifying or clobbering the results.json " |
| + "file: " + str(error)) |
| + return None |
| + |
| + builder_name = self._builder_name |
| + if results_json and builder_name not in results_json: |
| + _log.debug("Builder name (%s) is not in the results.json file." |
| + % builder_name) |
| + |
| + self._convert_json_to_current_version(results_json) |
| + |
| + if builder_name not in results_json: |
| + results_json[builder_name] = ( |
| + self._create_results_for_builder_json()) |
| + |
| + results_for_builder = results_json[builder_name] |
| + |
| + if builder_name: |
| + self._insert_generic_metadata(results_for_builder) |
| + |
| + self._insert_failure_summaries(results_for_builder) |
| + |
| + # Update the all failing tests with result type and time. |
| + tests = results_for_builder[self.TESTS] |
| + all_failing_tests = self._get_failed_test_names() |
| + all_failing_tests.update(convert_trie_to_flat_paths(tests)) |
| + |
| + for test in all_failing_tests: |
| + self._insert_test_time_and_result(test, tests) |
| + |
| + return results_json |
| + |
| + def set_archived_results(self, archived_results): |
| + self._archived_results = archived_results |
| + |
| + def upload_json_files(self, json_files): |
| + """Uploads the given json_files to the test_results_server (if the |
| + test_results_server is given).""" |
| + if not self._test_results_server: |
| + return |
| + |
| + if not self._master_name: |
| + _log.error("--test-results-server was set, but --master-name was not. Not uploading JSON files.") |
| + return |
| + |
| + _log.info("Uploading JSON files for builder: %s", self._builder_name) |
| + attrs = [("builder", self._builder_name), |
| + ("testtype", self._test_type), |
| + ("master", self._master_name)] |
| + |
| + files = [(file, os.path.join(self._results_directory, file)) |
| + for file in json_files] |
| + |
| + url = "http://%s/testfile/upload" % self._test_results_server |
| + # Set uploading timeout in case appengine server is having problems. |
| + # 120 seconds are more than enough to upload test results. |
| + uploader = _FileUploader(url, 120) |
| + try: |
| + response = uploader.upload_as_multipart_form_data(files, attrs) |
| + if response: |
| + if response.code == 200: |
| + _log.info("JSON uploaded.") |
| + else: |
| + _log.debug("JSON upload failed, %d: '%s'" % (response.code, response.read())) |
| + else: |
| + _log.error("JSON upload failed; no response returned") |
| + except Exception, err: |
| + _log.error("Upload failed: %s" % err) |
| + return |
| + |
| + |
| + def _get_test_timing(self, test_name): |
| + """Returns test timing data (elapsed time) in second |
| + for the given test_name.""" |
| + if test_name in self._test_results_map: |
| + # Floor for now to get time in seconds. |
| + return int(self._test_results_map[test_name].test_run_time) |
| + return 0 |
| + |
| + def _get_failed_test_names(self): |
| + """Returns a set of failed test names.""" |
| + return set([r.test_name for r in self._test_results if r.failed]) |
| + |
| + def _get_modifier_char(self, test_name): |
| + """Returns a single char (e.g. SKIP_RESULT, FAIL_RESULT, |
| + PASS_RESULT, NO_DATA_RESULT, etc) that indicates the test modifier |
| + for the given test_name. |
| + """ |
| + if test_name not in self._test_results_map: |
| + return self.__class__.NO_DATA_RESULT |
| + |
| + test_result = self._test_results_map[test_name] |
| + if test_result.modifier in self.MODIFIER_TO_CHAR.keys(): |
| + return self.MODIFIER_TO_CHAR[test_result.modifier] |
| + |
| + return self.__class__.PASS_RESULT |
| + |
| + def _get_result_char(self, test_name): |
| + """Returns a single char (e.g. SKIP_RESULT, FAIL_RESULT, |
| + PASS_RESULT, NO_DATA_RESULT, etc) that indicates the test result |
| + for the given test_name. |
| + """ |
| + if test_name not in self._test_results_map: |
| + return self.__class__.NO_DATA_RESULT |
| + |
| + test_result = self._test_results_map[test_name] |
| + if test_result.modifier == TestResult.DISABLED: |
| + return self.__class__.SKIP_RESULT |
| + |
| + if test_result.failed: |
| + return self.__class__.FAIL_RESULT |
| + |
| + return self.__class__.PASS_RESULT |
| + |
| + def _get_svn_revision(self, in_directory): |
| + """Returns the svn revision for the given directory. |
| + |
| + Args: |
| + in_directory: The directory where svn is to be run. |
| + """ |
| + # This is overridden in flakiness_dashboard_results_uploader.py. |
| + raise NotImplementedError() |
| + |
| + def _get_archived_json_results(self): |
| + """Download JSON file that only contains test |
| + name list from test-results server. This is for generating incremental |
| + JSON so the file generated has info for tests that failed before but |
| + pass or are skipped from current run. |
| + |
| + Returns (archived_results, error) tuple where error is None if results |
| + were successfully read. |
| + """ |
| + results_json = {} |
| + old_results = None |
| + error = None |
| + |
| + if not self._test_results_server: |
| + return {}, None |
| + |
| + results_file_url = (self.URL_FOR_TEST_LIST_JSON % |
| + (urllib2.quote(self._test_results_server), |
| + urllib2.quote(self._builder_name), |
| + self.RESULTS_FILENAME, |
| + urllib2.quote(self._test_type), |
| + urllib2.quote(self._master_name))) |
| + |
| + try: |
| + # FIXME: We should talk to the network via a Host object. |
| + results_file = urllib2.urlopen(results_file_url) |
| + info = results_file.info() |
| + old_results = results_file.read() |
| + except urllib2.HTTPError, http_error: |
| + # A non-4xx status code means the bot is hosed for some reason |
| + # and we can't grab the results.json file off of it. |
| + if (http_error.code < 400 and http_error.code >= 500): |
| + error = http_error |
| + except urllib2.URLError, url_error: |
| + error = url_error |
| + |
| + if old_results: |
| + # Strip the prefix and suffix so we can get the actual JSON object. |
| + old_results = strip_json_wrapper(old_results) |
| + |
| + try: |
| + results_json = json.loads(old_results) |
| + except: |
| + _log.debug("results.json was not valid JSON. Clobbering.") |
| + # The JSON file is not valid JSON. Just clobber the results. |
| + results_json = {} |
| + else: |
| + _log.debug('Old JSON results do not exist. Starting fresh.') |
| + results_json = {} |
| + |
| + return results_json, error |
| + |
| + def _insert_failure_summaries(self, results_for_builder): |
| + """Inserts aggregate pass/failure statistics into the JSON. |
| + This method reads self._test_results and generates |
| + FIXABLE, FIXABLE_COUNT and ALL_FIXABLE_COUNT entries. |
| + |
| + Args: |
| + results_for_builder: Dictionary containing the test results for a |
| + single builder. |
| + """ |
| + # Insert the number of tests that failed or skipped. |
| + fixable_count = len([r for r in self._test_results if r.fixable()]) |
| + self._insert_item_into_raw_list(results_for_builder, |
| + fixable_count, self.FIXABLE_COUNT) |
| + |
| + # Create a test modifiers (FAILS, FLAKY etc) summary dictionary. |
| + entry = {} |
| + for test_name in self._test_results_map.iterkeys(): |
| + result_char = self._get_modifier_char(test_name) |
| + entry[result_char] = entry.get(result_char, 0) + 1 |
| + |
| + # Insert the pass/skip/failure summary dictionary. |
| + self._insert_item_into_raw_list(results_for_builder, entry, |
| + self.FIXABLE) |
| + |
| + # Insert the number of all the tests that are supposed to pass. |
| + all_test_count = len(self._test_results) |
| + self._insert_item_into_raw_list(results_for_builder, |
| + all_test_count, self.ALL_FIXABLE_COUNT) |
| + |
| + def _insert_item_into_raw_list(self, results_for_builder, item, key): |
| + """Inserts the item into the list with the given key in the results for |
| + this builder. Creates the list if no such list exists. |
| + |
| + Args: |
| + results_for_builder: Dictionary containing the test results for a |
| + single builder. |
| + item: Number or string to insert into the list. |
| + key: Key in results_for_builder for the list to insert into. |
| + """ |
| + if key in results_for_builder: |
| + raw_list = results_for_builder[key] |
| + else: |
| + raw_list = [] |
| + |
| + raw_list.insert(0, item) |
| + raw_list = raw_list[:self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG] |
| + results_for_builder[key] = raw_list |
| + |
| + def _insert_item_run_length_encoded(self, item, encoded_results): |
| + """Inserts the item into the run-length encoded results. |
| + |
| + Args: |
| + item: String or number to insert. |
| + encoded_results: run-length encoded results. An array of arrays, e.g. |
| + [[3,'A'],[1,'Q']] encodes AAAQ. |
| + """ |
| + if len(encoded_results) and item == encoded_results[0][1]: |
| + num_results = encoded_results[0][0] |
| + if num_results <= self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG: |
| + encoded_results[0][0] = num_results + 1 |
| + else: |
| + # Use a list instead of a class for the run-length encoding since |
| + # we want the serialized form to be concise. |
| + encoded_results.insert(0, [1, item]) |
| + |
| + def _insert_generic_metadata(self, results_for_builder): |
| + """ Inserts generic metadata (such as version number, current time etc) |
| + into the JSON. |
| + |
| + Args: |
| + results_for_builder: Dictionary containing the test results for |
| + a single builder. |
| + """ |
| + self._insert_item_into_raw_list(results_for_builder, |
| + self._build_number, self.BUILD_NUMBERS) |
| + |
| + # Include SVN revisions for the given repositories. |
| + for (name, path) in self._svn_repositories: |
| + # Note: for JSON file's backward-compatibility we use 'chrome' rather |
| + # than 'chromium' here. |
| + lowercase_name = name.lower() |
| + if lowercase_name == 'chromium': |
| + lowercase_name = 'chrome' |
| + self._insert_item_into_raw_list(results_for_builder, |
| + self._get_svn_revision(path), |
| + lowercase_name + 'Revision') |
| + |
| + self._insert_item_into_raw_list(results_for_builder, |
| + int(time.time()), |
| + self.TIME) |
| + |
| + def _insert_test_time_and_result(self, test_name, tests): |
| + """ Insert a test item with its results to the given tests dictionary. |
| + |
| + Args: |
| + tests: Dictionary containing test result entries. |
| + """ |
| + |
| + result = self._get_result_char(test_name) |
| + time = self._get_test_timing(test_name) |
| + |
| + this_test = tests |
| + for segment in test_name.split("/"): |
| + if segment not in this_test: |
| + this_test[segment] = {} |
| + this_test = this_test[segment] |
| + |
| + if not len(this_test): |
| + self._populate_results_and_times_json(this_test) |
| + |
| + if self.RESULTS in this_test: |
| + self._insert_item_run_length_encoded(result, this_test[self.RESULTS]) |
| + else: |
| + this_test[self.RESULTS] = [[1, result]] |
| + |
| + if self.TIMES in this_test: |
| + self._insert_item_run_length_encoded(time, this_test[self.TIMES]) |
| + else: |
| + this_test[self.TIMES] = [[1, time]] |
| + |
| + def _convert_json_to_current_version(self, results_json): |
| + """If the JSON does not match the current version, converts it to the |
| + current version and adds in the new version number. |
| + """ |
| + if self.VERSION_KEY in results_json: |
| + archive_version = results_json[self.VERSION_KEY] |
| + if archive_version == self.VERSION: |
| + return |
| + else: |
| + archive_version = 3 |
| + |
| + # version 3->4 |
| + if archive_version == 3: |
| + num_results = len(results_json.values()) |
| + for builder, results in results_json.iteritems(): |
| + self._convert_tests_to_trie(results) |
| + |
| + results_json[self.VERSION_KEY] = self.VERSION |
| + |
| + def _convert_tests_to_trie(self, results): |
| + if not self.TESTS in results: |
| + return |
| + |
| + test_results = results[self.TESTS] |
| + test_results_trie = {} |
| + for test in test_results.iterkeys(): |
| + single_test_result = test_results[test] |
| + add_path_to_trie(test, single_test_result, test_results_trie) |
| + |
| + results[self.TESTS] = test_results_trie |
| + |
| + def _populate_results_and_times_json(self, results_and_times): |
| + results_and_times[self.RESULTS] = [] |
| + results_and_times[self.TIMES] = [] |
| + return results_and_times |
| + |
| + def _create_results_for_builder_json(self): |
| + results_for_builder = {} |
| + results_for_builder[self.TESTS] = {} |
| + return results_for_builder |
| + |
| + def _remove_items_over_max_number_of_builds(self, encoded_list): |
| + """Removes items from the run-length encoded list after the final |
| + item that exceeds the max number of builds to track. |
| + |
| + Args: |
| + encoded_results: run-length encoded results. An array of arrays, e.g. |
| + [[3,'A'],[1,'Q']] encodes AAAQ. |
| + """ |
| + num_builds = 0 |
| + index = 0 |
| + for result in encoded_list: |
| + num_builds = num_builds + result[0] |
| + index = index + 1 |
| + if num_builds > self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG: |
| + return encoded_list[:index] |
| + return encoded_list |
| + |
| + def _normalize_results_json(self, test, test_name, tests): |
| + """ Prune tests where all runs pass or tests that no longer exist and |
| + truncate all results to maxNumberOfBuilds. |
| + |
| + Args: |
| + test: ResultsAndTimes object for this test. |
| + test_name: Name of the test. |
| + tests: The JSON object with all the test results for this builder. |
| + """ |
| + test[self.RESULTS] = self._remove_items_over_max_number_of_builds( |
| + test[self.RESULTS]) |
| + test[self.TIMES] = self._remove_items_over_max_number_of_builds( |
| + test[self.TIMES]) |
| + |
| + is_all_pass = self._is_results_all_of_type(test[self.RESULTS], |
| + self.PASS_RESULT) |
| + is_all_no_data = self._is_results_all_of_type(test[self.RESULTS], |
| + self.NO_DATA_RESULT) |
| + max_time = max([time[1] for time in test[self.TIMES]]) |
| + |
| + # Remove all passes/no-data from the results to reduce noise and |
| + # filesize. If a test passes every run, but takes > MIN_TIME to run, |
| + # don't throw away the data. |
| + if is_all_no_data or (is_all_pass and max_time <= self.MIN_TIME): |
| + del tests[test_name] |
| + |
| + def _is_results_all_of_type(self, results, type): |
| + """Returns whether all the results are of the given type |
| + (e.g. all passes).""" |
| + return len(results) == 1 and results[0][1] == type |
| + |
| + |
| +# Left here not to break anything. |
| +class JSONResultsGenerator(JSONResultsGeneratorBase): |
| + pass |
| + |
| + |
| +class _FileUploader(object): |
| + def __init__(self, url, timeout_seconds): |
| + self._url = url |
| + self._timeout_seconds = timeout_seconds |
| + |
| + def upload_as_multipart_form_data(self, filesystem, files, attrs): |
| + file_objs = [] |
| + for filename, path in files: |
| + file_objs.append(('file', filename, filesystem.read_binary_file(path))) |
| + |
| + # FIXME: We should use the same variable names for the formal and actual |
| + # parameters. |
| + content_type, data = _encode_multipart_form_data(attrs, file_objs) |
| + return self._upload_data(content_type, data) |
| + |
| + def _upload_data(self, content_type, data): |
| + start = time.time() |
| + end = start + self._timeout_seconds |
| + while time.time() < end: |
| + try: |
| + request = urllib2.Request(self._url, data, |
| + {"Content-Type": content_type}) |
| + return urllib2.urlopen(request) |
| + except urllib2.HTTPError as e: |
| + _log.warn("Received HTTP status %s loading \"%s\". " |
| + "Retrying in 10 seconds..." % (e.code, e.filename)) |
| + time.sleep(10) |
| + |
| + |
| +def _get_mime_type(filename): |
| + return mimetypes.guess_type(filename)[0] or 'application/octet-stream' |
| + |
| + |
| +# FIXME: Rather than taking tuples, this function should take more |
| +# structured data. |
| +def _encode_multipart_form_data(fields, files): |
| + """Encode form fields for multipart/form-data. |
| + |
| + Args: |
| + fields: A sequence of (name, value) elements for regular form fields. |
| + files: A sequence of (name, filename, value) elements for data to be |
| + uploaded as files. |
| + Returns: |
| + (content_type, body) ready for httplib.HTTP instance. |
| + |
| + Source: |
| + http://code.google.com/p/rietveld/source/browse/trunk/upload.py |
| + """ |
| + BOUNDARY = '-M-A-G-I-C---B-O-U-N-D-A-R-Y-' |
| + CRLF = '\r\n' |
| + lines = [] |
| + |
| + for key, value in fields: |
| + lines.append('--' + BOUNDARY) |
| + lines.append('Content-Disposition: form-data; name="%s"' % key) |
| + lines.append('') |
| + if isinstance(value, unicode): |
| + value = value.encode('utf-8') |
| + lines.append(value) |
| + |
| + for key, filename, value in files: |
| + lines.append('--' + BOUNDARY) |
| + lines.append('Content-Disposition: form-data; name="%s"; ' |
| + 'filename="%s"' % (key, filename)) |
| + lines.append('Content-Type: %s' % _get_mime_type(filename)) |
| + lines.append('') |
| + if isinstance(value, unicode): |
| + value = value.encode('utf-8') |
| + lines.append(value) |
| + |
| + lines.append('--' + BOUNDARY + '--') |
| + lines.append('') |
| + body = CRLF.join(lines) |
| + content_type = 'multipart/form-data; boundary=%s' % BOUNDARY |
| + return content_type, body |