OLD | NEW |
| (Empty) |
1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Module contains communication methods between cbuildbot instances.""" | |
6 | |
7 import Queue | |
8 import SocketServer | |
9 import os | |
10 import socket | |
11 import sys | |
12 import time | |
13 | |
14 sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) | |
15 from cros_build_lib import Info, Warning, RunCommand | |
16 | |
17 # Communication port for master to slave communication. | |
18 _COMM_PORT = 32890 | |
19 # TCP Buffer Size. | |
20 _BUFFER = 4096 | |
21 # Timeout between checks for new status by either end. | |
22 _HEARTBEAT_TIMEOUT = 60 # in sec. | |
23 # Max Timeout to wait before assuming failure. | |
24 _MAX_TIMEOUT = 30 * 60 # in sec. | |
25 | |
26 # Commands - sent to slave from master. | |
27 | |
28 # Report whether you have completed or failed building. | |
29 _COMMAND_CHECK_STATUS = 'check-status' | |
30 | |
31 # Return status - response to commands from slaves (self.explanatory) | |
32 _STATUS_COMMAND_REJECTED = 'rejected' | |
33 _STATUS_TIMEOUT = 'timeout' | |
34 # Public for cbuildbot. | |
35 STATUS_BUILD_COMPLETE = 'complete' | |
36 STATUS_BUILD_FAILED = 'failure' | |
37 | |
38 # Global queues to communicate with server. | |
39 _status_queue = Queue.Queue(1) | |
40 _receive_queue = Queue.Queue(1) | |
41 _command_queue = Queue.Queue(1) | |
42 | |
43 class _TCPServerWithReuse(SocketServer.TCPServer): | |
44 """TCPServer that allows re-use of socket and timed out sockets.""" | |
45 SocketServer.TCPServer.allow_reuse_address = True | |
46 | |
47 def __init__(self, address, handler, timeout): | |
48 SocketServer.TCPServer.__init__(self, address, handler) | |
49 self.socket.settimeout(timeout) | |
50 | |
51 | |
52 class _SlaveCommandHandler(SocketServer.BaseRequestHandler): | |
53 """Handles requests from a master pre-flight-queue bot.""" | |
54 | |
55 def _HandleCommand(self, command, args): | |
56 """Handles command and returns status for master.""" | |
57 Info('(Slave) - Received command %s with args %s' % (command, args)) | |
58 command_to_expect = _command_queue.get() | |
59 # Check status also adds an entry on the status queue. | |
60 if command_to_expect == _COMMAND_CHECK_STATUS: | |
61 slave_status = _status_queue.get() | |
62 # Safety check to make sure the server is in a good state. | |
63 if command_to_expect != command: | |
64 Warning( | |
65 '(Slave) - Rejecting command %s. Was expecting %s.' % (command, | |
66 command_to_expect)) | |
67 return _STATUS_COMMAND_REJECTED | |
68 # Give slave command with optional args. | |
69 _receive_queue.put(args) | |
70 if command == _COMMAND_CHECK_STATUS: | |
71 # Returns status to send. | |
72 return slave_status | |
73 | |
74 def handle(self): | |
75 """Overriden. Handles commands sent from master.""" | |
76 data = self.request.recv(_BUFFER).strip() | |
77 (command, args) = data.split('\n') | |
78 response = self._HandleCommand(command, args) | |
79 self.request.send(response) | |
80 | |
81 | |
82 def _GetSlaveNames(configuration): | |
83 """Returns an array of slave hostnames that are important.""" | |
84 slaves = [] | |
85 for slave_config in configuration.items(): | |
86 if (not slave_config[1]['master'] and | |
87 slave_config[1]['important']): | |
88 slaves.append(slave_config[1]['hostname']) | |
89 return slaves | |
90 | |
91 | |
92 def _SendCommand(hostname, command, args): | |
93 """Returns response from host or _STATUS_TIMEOUT on error.""" | |
94 data = '%s\n%s\n' % (command, args) | |
95 Info('(Master) - Sending %s %s to %s' % (command, args, hostname)) | |
96 | |
97 # Create a socket (SOCK_STREAM means a TCP socket). | |
98 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
99 | |
100 try: | |
101 # Connect to server and send data | |
102 sock.connect((hostname, _COMM_PORT)) | |
103 sock.send(data) | |
104 | |
105 # Receive data from the server and shut down. | |
106 received = sock.recv(_BUFFER) | |
107 except: | |
108 received = _STATUS_TIMEOUT | |
109 finally: | |
110 sock.close() | |
111 return received | |
112 | |
113 | |
114 def _CheckSlavesLeftStatus(slaves_to_check): | |
115 """Returns True if remaining slaves have completed. | |
116 | |
117 Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns | |
118 True as long as no slave reports STATUS_BUILD_FAILED. | |
119 | |
120 Keyword arguments: | |
121 slaves_to_check -- Array of hostnames to check. | |
122 | |
123 """ | |
124 slaves_to_remove = [] | |
125 for slave in slaves_to_check: | |
126 status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty') | |
127 if status == STATUS_BUILD_FAILED: | |
128 Warning('(Master) - Slave %s failed' % slave) | |
129 return False | |
130 elif status == STATUS_BUILD_COMPLETE: | |
131 Info('(Master) - Slave %s completed' % slave) | |
132 slaves_to_remove.append(slave) | |
133 for slave in slaves_to_remove: | |
134 slaves_to_check.remove(slave) | |
135 return True | |
136 | |
137 | |
138 def HaveSlavesCompleted(configuration): | |
139 """Returns True if all other slaves have succeeded. | |
140 | |
141 Checks other slaves status until either '_MAX_TIMEOUT' has passed, | |
142 at least one slaves reports a failure, or all slaves report success. | |
143 | |
144 Keyword arguments: | |
145 configuration -- configuration dictionary for slaves. | |
146 | |
147 """ | |
148 not_failed = True | |
149 slaves_to_check = _GetSlaveNames(configuration) | |
150 timeout = 0 | |
151 while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT: | |
152 not_failed = _CheckSlavesLeftStatus(slaves_to_check) | |
153 if slaves_to_check and not_failed: | |
154 time.sleep(_HEARTBEAT_TIMEOUT) | |
155 timeout += _HEARTBEAT_TIMEOUT | |
156 return len(slaves_to_check) == 0 | |
157 | |
158 | |
159 def PublishStatus(status): | |
160 """Publishes status and Returns True if master received it. | |
161 | |
162 This call is blocking until either the master pre-flight-queue bot picks | |
163 up the status, or a '_MAX_TIMEOUT' has passed. | |
164 | |
165 Keyword arguments: | |
166 status -- should be a string and one of STATUS_BUILD_.*. | |
167 | |
168 """ | |
169 # Clean up queues. | |
170 try: | |
171 _command_queue.get_nowait() | |
172 except Queue.Empty: pass | |
173 try: | |
174 _status_queue.get_nowait() | |
175 except Queue.Empty: pass | |
176 | |
177 _command_queue.put(_COMMAND_CHECK_STATUS) | |
178 _status_queue.put(status) | |
179 server = _TCPServerWithReuse(('localhost', _COMM_PORT), | |
180 _SlaveCommandHandler, _HEARTBEAT_TIMEOUT) | |
181 timeout = 0 | |
182 response = None | |
183 try: | |
184 while not response and timeout < _MAX_TIMEOUT: | |
185 server.handle_request() | |
186 try: | |
187 response = _receive_queue.get_nowait() | |
188 except Queue.Empty: | |
189 Info('(Slave) - Waiting for master to accept %s' % status) | |
190 timeout += _HEARTBEAT_TIMEOUT | |
191 response = None | |
192 except Exception, e: | |
193 Warning('%s' % e) | |
194 server.server_close() | |
195 return response != None | |
OLD | NEW |