| Index: commit-queue/async_push.py
|
| ===================================================================
|
| --- commit-queue/async_push.py (revision 249146)
|
| +++ commit-queue/async_push.py (working copy)
|
| @@ -1,130 +0,0 @@
|
| -# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -"""Out of band HTTP push."""
|
| -
|
| -import Queue
|
| -import json
|
| -import logging
|
| -import os
|
| -import threading
|
| -import time
|
| -import traceback
|
| -import urllib
|
| -
|
| -import errors
|
| -from verification import base
|
| -
|
| -
|
| -class AsyncPushNoop(object):
|
| - url = 'http://localhost'
|
| - def close(self):
|
| - pass
|
| -
|
| - def send(self, pending, packet):
|
| - pass
|
| -
|
| - @staticmethod
|
| - def _package(pending, packet):
|
| - data = {
|
| - 'done': pending.get_state() not in (base.PROCESSING, base.IGNORED),
|
| - 'issue': pending.issue,
|
| - 'owner': pending.owner,
|
| - 'patchset': pending.patchset,
|
| - 'timestamp': time.time(),
|
| - }
|
| - if packet:
|
| - data.update(packet)
|
| - return data
|
| -
|
| -
|
| -class AsyncPushStore(AsyncPushNoop):
|
| - """Saves all the events into workdir/events.json for later analysis.
|
| -
|
| - Thread-safe.
|
| - """
|
| - def __init__(self):
|
| - super(AsyncPushStore, self).__init__()
|
| - self.lock = threading.Lock()
|
| - self.queue = []
|
| -
|
| - def close(self):
|
| - with self.lock:
|
| - with open(os.path.join('workdir', 'events.json'), 'w') as f:
|
| - json.dump(self.queue, f, indent=2)
|
| -
|
| - def send(self, pending, packet):
|
| - with self.lock:
|
| - self.queue.append(self._package(pending, packet))
|
| -
|
| -
|
| -class AsyncPush(AsyncPushNoop):
|
| - """Sends HTTP Post asynchronously to the tree status application.
|
| -
|
| - This object uses a background worker thread, and is thread-safe.
|
| - """
|
| - _TERMINATE = object()
|
| -
|
| - def __init__(self, url, password, resource='/receiver'):
|
| - super(AsyncPush, self).__init__()
|
| - assert url
|
| - assert password
|
| - self.url = url
|
| - self.resource = resource
|
| - self.password = password
|
| - self.queue = Queue.Queue()
|
| - self.thread = threading.Thread(target=self._worker_thread)
|
| - self.thread.daemon = True
|
| - self.thread.start()
|
| -
|
| - def close(self):
|
| - self.queue.put(self._TERMINATE)
|
| - self.thread.join()
|
| -
|
| - def send(self, pending, packet):
|
| - """Queues a packet."""
|
| - logging.debug('For issue %d, queuing for send: %s', pending.issue, packet)
|
| - self.queue.put(self._package(pending, packet))
|
| -
|
| - def _get_items(self):
|
| - """Waits for an item to be queued and returns up to 10 next items if queued
|
| - fast enough.
|
| - """
|
| - items = [self.queue.get()]
|
| - try:
|
| - for _ in range(9):
|
| - items.append(self.queue.get_nowait())
|
| - except Queue.Empty:
|
| - pass
|
| - return items
|
| -
|
| - def _worker_thread(self):
|
| - """Sends the packets in a loop through HTTP POST."""
|
| - params = {
|
| - 'Content-type': 'application/x-www-form-urlencoded',
|
| - 'Accept': 'text/plain'
|
| - }
|
| - done = False
|
| - try:
|
| - while not done:
|
| - items = self._get_items()
|
| - if self._TERMINATE in items:
|
| - done = True
|
| - logging.debug('Worker thread exiting')
|
| - items.remove(self._TERMINATE)
|
| - url = self.url + self.resource
|
| - logging.debug('Sending %d items to %s: %r', len(items), url, items)
|
| - try:
|
| - data = [('p', json.dumps(item)) for item in items]
|
| - data.append(('password', self.password))
|
| - urllib.urlopen(url, urllib.urlencode(data), params).read()
|
| - except IOError as e:
|
| - logging.error(e)
|
| - for item in items:
|
| - self.queue.put(item)
|
| - if not done:
|
| - time.sleep(1)
|
| - # Don't retry if done.
|
| - except Exception as e:
|
| - traceback.print_exc()
|
| - errors.send_stack(e)
|
|
|