Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Unified Diff: client/tests/httpserver_mock.py

Issue 2037253002: run_isolated.py: install CIPD packages (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/tests/httpserver_mock.py
diff --git a/client/tests/httpserver_mock.py b/client/tests/httpserver_mock.py
new file mode 100644
index 0000000000000000000000000000000000000000..d4d2cc5739a8065a89711dd9f3d56f0182b1338b
--- /dev/null
+++ b/client/tests/httpserver_mock.py
@@ -0,0 +1,74 @@
+# Copyright 2014 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+import BaseHTTPServer
+import json
+import logging
+import threading
+import urllib2
+
+
+class MockHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def _json(self, data):
+ """Sends a JSON response."""
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ json.dump(data, self.wfile)
+
+ def _octet_stream(self, data):
+ """Sends a binary response."""
+ self.send_response(200)
+ self.send_header('Content-type', 'application/octet-stream')
+ self.end_headers()
+ self.wfile.write(data)
+
+ def _read_body(self):
+ """Reads the request body."""
+ return self.rfile.read(int(self.headers['Content-Length']))
+
+ def _drop_body(self):
+ """Reads the request body."""
+ size = int(self.headers['Content-Length'])
+ while size:
+ chunk = min(4096, size)
+ self.rfile.read(chunk)
+ size -= chunk
+
+ def log_message(self, fmt, *args):
+ logging.info(
+ '%s - - [%s] %s', self.address_string(), self.log_date_time_string(),
+ fmt % args)
+
+
+class MockServer(object):
+ _HANDLER_CLS = None
+
+ def __init__(self):
+ self._closed = False
+ self._server = BaseHTTPServer.HTTPServer(
+ ('127.0.0.1', 0), self._HANDLER_CLS)
+ self._server.url = self.url = 'http://localhost:%d' % (
+ self._server.server_port)
+ self._thread = threading.Thread(target=self._run, name='httpd')
+ self._thread.daemon = True
+ self._thread.start()
+ logging.info('%s', self.url)
+
+ def close(self):
+ self.close_start()
+ self.close_end()
+
+ def close_start(self):
+ assert not self._closed
+ self._closed = True
+ urllib2.urlopen(self.url + '/on/quit')
+
+ def close_end(self):
+ assert self._closed
+ self._thread.join()
+
+ def _run(self):
+ while not self._closed:
+ self._server.handle_request()

Powered by Google App Engine
This is Rietveld 408576698