Index: bin/cbuildbot_comm.py |
diff --git a/bin/cbuildbot_comm.py b/bin/cbuildbot_comm.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..7dfc266c4c139e6452f20a4fbf4e0b401ce39348 |
--- /dev/null |
+++ b/bin/cbuildbot_comm.py |
@@ -0,0 +1,195 @@ |
+# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Module contains communication methods between cbuildbot instances.""" |
+ |
+import Queue |
+import SocketServer |
+import socket |
+import sys |
+import time |
+ |
+from cbuildbot import RunCommand |
+ |
+# Communication port for master to slave communication. |
+_COMM_PORT = 32890 |
+# TCP Buffer Size. |
+_BUFFER = 4096 |
+# Timeout between checks for new status by either end. |
+_HEARTBEAT_TIMEOUT = 60 # in sec. |
+# Max Timeout to wait before assuming failure. |
+_MAX_TIMEOUT = 30 * 60 # in sec. |
+ |
+# Commands - sent to slave from master. |
+ |
+# Report whether you have completed or failed building. |
+_COMMAND_CHECK_STATUS = 'check-status' |
+ |
+# Return status - response to commands from slaves (self.explanatory) |
+_STATUS_COMMAND_REJECTED = 'rejected' |
+_STATUS_TIMEOUT = 'timeout' |
+# Public for cbuildbot. |
+STATUS_BUILD_COMPLETE = 'complete' |
+STATUS_BUILD_FAILED = 'failure' |
+ |
+# Global queues to communicate with server. |
+_status_queue = Queue.Queue(1) |
+_receive_queue = Queue.Queue(1) |
+_command_queue = Queue.Queue(1) |
+ |
+class _TCPServerWithReuse(SocketServer.TCPServer): |
+ """TCPServer that allows re-use of socket and timed out sockets.""" |
+ SocketServer.TCPServer.allow_reuse_address = True |
+ |
+ def __init__(self, address, handler, timeout): |
+ SocketServer.TCPServer.__init__(self, address, handler) |
+ self.socket.settimeout(timeout) |
+ |
+ |
+class _SlaveCommandHandler(SocketServer.BaseRequestHandler): |
+ """Handles requests from a master pre-flight-queue bot.""" |
+ |
+ def _HandleCommand(self, command, args): |
+ """Handles command and returns status for master.""" |
+ print >> sys.stderr, ('(Slave) - Received command %s with args %s' % |
+ (command, args)) |
+ command_to_expect = _command_queue.get() |
+ # Check status also adds an entry on the status queue. |
+ if command_to_expect == _COMMAND_CHECK_STATUS: |
+ slave_status = _status_queue.get() |
+ # Safety check to make sure the server is in a good state. |
+ if command_to_expect != command: |
+ print >> sys.stderr, ( |
+ '(Slave) - Rejecting command %s. Was expecting %s.' % (command, |
+ command_to_expect)) |
+ return _STATUS_COMMAND_REJECTED |
+ # Give slave command with optional args. |
+ _receive_queue.put(args) |
+ if command == _COMMAND_CHECK_STATUS: |
+ # Returns status to send. |
+ return slave_status |
+ |
+ def handle(self): |
+ """Overriden. Handles commands sent from master.""" |
+ data = self.request.recv(_BUFFER).strip() |
+ (command, args) = data.split('\n') |
+ response = self._HandleCommand(command, args) |
+ self.request.send(response) |
+ |
+ |
+def _GetSlaveNames(configuration): |
+ """Returns an array of slave hostnames that are important.""" |
+ slaves = [] |
+ for slave_config in configuration.items(): |
+ if (not slave_config[1]['master'] and |
+ slave_config[1]['important']): |
+ slaves.append(slave_config[1]['hostname']) |
+ return slaves |
+ |
+ |
+def _SendCommand(hostname, command, args): |
+ """Returns response from host or _STATUS_TIMEOUT on error.""" |
+ data = '%s\n%s\n' % (command, args) |
+ print '(Master) - Sending %s %s to %s' % (command, args, hostname) |
+ |
+ # Create a socket (SOCK_STREAM means a TCP socket). |
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
+ |
+ try: |
+ # Connect to server and send data |
+ sock.connect((hostname, _COMM_PORT)) |
+ sock.send(data) |
+ |
+ # Receive data from the server and shut down. |
+ received = sock.recv(_BUFFER) |
+ except: |
+ received = _STATUS_TIMEOUT |
+ finally: |
+ sock.close() |
+ return received |
+ |
+ |
+def _CheckSlavesLeftStatus(slaves_to_check): |
+ """Returns True if remaining slaves have completed. |
+ |
+ Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns |
+ True as long as no slave reports STATUS_BUILD_FAILED. |
+ |
+ Keyword arguments: |
+ slaves_to_check -- Array of hostnames to check. |
+ |
+ """ |
+ slaves_to_remove = [] |
+ for slave in slaves_to_check: |
+ status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty') |
+ if status == STATUS_BUILD_FAILED: |
+ print >> sys.stderr, '(Master) - Slave %s failed' % slave |
+ return False |
+ elif status == STATUS_BUILD_COMPLETE: |
+ print >> sys.stderr, '(Master) - Slave %s completed' % slave |
+ slaves_to_remove.append(slave) |
+ for slave in slaves_to_remove: |
+ slaves_to_check.remove(slave) |
+ return True |
+ |
+ |
+def HaveSlavesCompleted(configuration): |
+ """Returns True if all other slaves have succeeded. |
+ |
+ Checks other slaves status until either '_MAX_TIMEOUT' has passed, |
+ at least one slaves reports a failure, or all slaves report success. |
+ |
+ Keyword arguments: |
+ configuration -- configuration dictionary for slaves. |
+ |
+ """ |
+ not_failed = True |
+ slaves_to_check = _GetSlaveNames(configuration) |
+ timeout = 0 |
+ while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT: |
+ not_failed = _CheckSlavesLeftStatus(slaves_to_check) |
+ if slaves_to_check and not_failed: |
+ time.sleep(_HEARTBEAT_TIMEOUT) |
+ timeout += _HEARTBEAT_TIMEOUT |
+ return len(slaves_to_check) == 0 |
+ |
+ |
+def PublishStatus(status): |
+ """Publishes status and Returns True if master received it. |
+ |
+ This call is blocking until either the master pre-flight-queue bot picks |
+ up the status, or a '_MAX_TIMEOUT' has passed. |
+ |
+ Keyword arguments: |
+ status -- should be a string and one of STATUS_BUILD_.*. |
+ |
+ """ |
+ # Clean up queues. |
+ try: |
+ _command_queue.get_nowait() |
+ except Queue.Empty: pass |
+ try: |
+ _status_queue.get_nowait() |
+ except Queue.Empty: pass |
+ |
+ _command_queue.put(_COMMAND_CHECK_STATUS) |
+ _status_queue.put(status) |
+ server = _TCPServerWithReuse(('localhost', _COMM_PORT), |
+ _SlaveCommandHandler, _HEARTBEAT_TIMEOUT) |
+ timeout = 0 |
+ response = None |
+ try: |
+ while not response and timeout < _MAX_TIMEOUT: |
+ server.handle_request() |
+ try: |
+ response = _receive_queue.get_nowait() |
+ except Queue.Empty: |
+ print >> sys.stderr, ('(Slave) - Waiting for master to accept %s' % ( |
+ status)) |
+ timeout += _HEARTBEAT_TIMEOUT |
+ response = None |
+ except Exception, e: |
+ print >> sys.stderr, '%s' % e |
+ server.server_close() |
+ return response != None |