| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 """Module contains communication methods between cbuildbot instances.""" | |
| 6 | |
| 7 import Queue | |
| 8 import SocketServer | |
| 9 import os | |
| 10 import socket | |
| 11 import sys | |
| 12 import time | |
| 13 | |
| 14 sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) | |
| 15 from cros_build_lib import Info, Warning, RunCommand | |
| 16 | |
| 17 # Communication port for master to slave communication. | |
| 18 _COMM_PORT = 32890 | |
| 19 # TCP Buffer Size. | |
| 20 _BUFFER = 4096 | |
| 21 # Timeout between checks for new status by either end. | |
| 22 _HEARTBEAT_TIMEOUT = 60 # in sec. | |
| 23 # Max Timeout to wait before assuming failure. | |
| 24 _MAX_TIMEOUT = 30 * 60 # in sec. | |
| 25 | |
| 26 # Commands - sent to slave from master. | |
| 27 | |
| 28 # Report whether you have completed or failed building. | |
| 29 _COMMAND_CHECK_STATUS = 'check-status' | |
| 30 | |
| 31 # Return status - response to commands from slaves (self.explanatory) | |
| 32 _STATUS_COMMAND_REJECTED = 'rejected' | |
| 33 _STATUS_TIMEOUT = 'timeout' | |
| 34 # Public for cbuildbot. | |
| 35 STATUS_BUILD_COMPLETE = 'complete' | |
| 36 STATUS_BUILD_FAILED = 'failure' | |
| 37 | |
| 38 # Global queues to communicate with server. | |
| 39 _status_queue = Queue.Queue(1) | |
| 40 _receive_queue = Queue.Queue(1) | |
| 41 _command_queue = Queue.Queue(1) | |
| 42 | |
| 43 class _TCPServerWithReuse(SocketServer.TCPServer): | |
| 44 """TCPServer that allows re-use of socket and timed out sockets.""" | |
| 45 SocketServer.TCPServer.allow_reuse_address = True | |
| 46 | |
| 47 def __init__(self, address, handler, timeout): | |
| 48 SocketServer.TCPServer.__init__(self, address, handler) | |
| 49 self.socket.settimeout(timeout) | |
| 50 | |
| 51 | |
| 52 class _SlaveCommandHandler(SocketServer.BaseRequestHandler): | |
| 53 """Handles requests from a master pre-flight-queue bot.""" | |
| 54 | |
| 55 def _HandleCommand(self, command, args): | |
| 56 """Handles command and returns status for master.""" | |
| 57 Info('(Slave) - Received command %s with args %s' % (command, args)) | |
| 58 command_to_expect = _command_queue.get() | |
| 59 # Check status also adds an entry on the status queue. | |
| 60 if command_to_expect == _COMMAND_CHECK_STATUS: | |
| 61 slave_status = _status_queue.get() | |
| 62 # Safety check to make sure the server is in a good state. | |
| 63 if command_to_expect != command: | |
| 64 Warning( | |
| 65 '(Slave) - Rejecting command %s. Was expecting %s.' % (command, | |
| 66 command_to_expect)) | |
| 67 return _STATUS_COMMAND_REJECTED | |
| 68 # Give slave command with optional args. | |
| 69 _receive_queue.put(args) | |
| 70 if command == _COMMAND_CHECK_STATUS: | |
| 71 # Returns status to send. | |
| 72 return slave_status | |
| 73 | |
| 74 def handle(self): | |
| 75 """Overriden. Handles commands sent from master.""" | |
| 76 data = self.request.recv(_BUFFER).strip() | |
| 77 (command, args) = data.split('\n') | |
| 78 response = self._HandleCommand(command, args) | |
| 79 self.request.send(response) | |
| 80 | |
| 81 | |
| 82 def _GetSlaveNames(configuration): | |
| 83 """Returns an array of slave hostnames that are important.""" | |
| 84 slaves = [] | |
| 85 for slave_config in configuration.items(): | |
| 86 if (not slave_config[1]['master'] and | |
| 87 slave_config[1]['important']): | |
| 88 slaves.append(slave_config[1]['hostname']) | |
| 89 return slaves | |
| 90 | |
| 91 | |
| 92 def _SendCommand(hostname, command, args): | |
| 93 """Returns response from host or _STATUS_TIMEOUT on error.""" | |
| 94 data = '%s\n%s\n' % (command, args) | |
| 95 Info('(Master) - Sending %s %s to %s' % (command, args, hostname)) | |
| 96 | |
| 97 # Create a socket (SOCK_STREAM means a TCP socket). | |
| 98 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| 99 | |
| 100 try: | |
| 101 # Connect to server and send data | |
| 102 sock.connect((hostname, _COMM_PORT)) | |
| 103 sock.send(data) | |
| 104 | |
| 105 # Receive data from the server and shut down. | |
| 106 received = sock.recv(_BUFFER) | |
| 107 except: | |
| 108 received = _STATUS_TIMEOUT | |
| 109 finally: | |
| 110 sock.close() | |
| 111 return received | |
| 112 | |
| 113 | |
| 114 def _CheckSlavesLeftStatus(slaves_to_check): | |
| 115 """Returns True if remaining slaves have completed. | |
| 116 | |
| 117 Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns | |
| 118 True as long as no slave reports STATUS_BUILD_FAILED. | |
| 119 | |
| 120 Keyword arguments: | |
| 121 slaves_to_check -- Array of hostnames to check. | |
| 122 | |
| 123 """ | |
| 124 slaves_to_remove = [] | |
| 125 for slave in slaves_to_check: | |
| 126 status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty') | |
| 127 if status == STATUS_BUILD_FAILED: | |
| 128 Warning('(Master) - Slave %s failed' % slave) | |
| 129 return False | |
| 130 elif status == STATUS_BUILD_COMPLETE: | |
| 131 Info('(Master) - Slave %s completed' % slave) | |
| 132 slaves_to_remove.append(slave) | |
| 133 for slave in slaves_to_remove: | |
| 134 slaves_to_check.remove(slave) | |
| 135 return True | |
| 136 | |
| 137 | |
| 138 def HaveSlavesCompleted(configuration): | |
| 139 """Returns True if all other slaves have succeeded. | |
| 140 | |
| 141 Checks other slaves status until either '_MAX_TIMEOUT' has passed, | |
| 142 at least one slaves reports a failure, or all slaves report success. | |
| 143 | |
| 144 Keyword arguments: | |
| 145 configuration -- configuration dictionary for slaves. | |
| 146 | |
| 147 """ | |
| 148 not_failed = True | |
| 149 slaves_to_check = _GetSlaveNames(configuration) | |
| 150 timeout = 0 | |
| 151 while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT: | |
| 152 not_failed = _CheckSlavesLeftStatus(slaves_to_check) | |
| 153 if slaves_to_check and not_failed: | |
| 154 time.sleep(_HEARTBEAT_TIMEOUT) | |
| 155 timeout += _HEARTBEAT_TIMEOUT | |
| 156 return len(slaves_to_check) == 0 | |
| 157 | |
| 158 | |
| 159 def PublishStatus(status): | |
| 160 """Publishes status and Returns True if master received it. | |
| 161 | |
| 162 This call is blocking until either the master pre-flight-queue bot picks | |
| 163 up the status, or a '_MAX_TIMEOUT' has passed. | |
| 164 | |
| 165 Keyword arguments: | |
| 166 status -- should be a string and one of STATUS_BUILD_.*. | |
| 167 | |
| 168 """ | |
| 169 # Clean up queues. | |
| 170 try: | |
| 171 _command_queue.get_nowait() | |
| 172 except Queue.Empty: pass | |
| 173 try: | |
| 174 _status_queue.get_nowait() | |
| 175 except Queue.Empty: pass | |
| 176 | |
| 177 _command_queue.put(_COMMAND_CHECK_STATUS) | |
| 178 _status_queue.put(status) | |
| 179 server = _TCPServerWithReuse(('localhost', _COMM_PORT), | |
| 180 _SlaveCommandHandler, _HEARTBEAT_TIMEOUT) | |
| 181 timeout = 0 | |
| 182 response = None | |
| 183 try: | |
| 184 while not response and timeout < _MAX_TIMEOUT: | |
| 185 server.handle_request() | |
| 186 try: | |
| 187 response = _receive_queue.get_nowait() | |
| 188 except Queue.Empty: | |
| 189 Info('(Slave) - Waiting for master to accept %s' % status) | |
| 190 timeout += _HEARTBEAT_TIMEOUT | |
| 191 response = None | |
| 192 except Exception, e: | |
| 193 Warning('%s' % e) | |
| 194 server.server_close() | |
| 195 return response != None | |
| OLD | NEW |