Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: bin/cbuildbot_comm.py

Issue 3165052: Add ability for cbuildbot master to synchronize with cbuildbot slaves. (Closed) Base URL: http://src.chromium.org/git/crosutils.git
Patch Set: Remove extra dep Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « bin/cbuildbot.py ('k') | bin/cbuildbot_comm_unittest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Module contains communication methods between cbuildbot instances."""
6
7 import Queue
8 import SocketServer
9 import socket
10 import sys
11 import time
12
13 from cbuildbot import RunCommand
14
15 # Communication port for master to slave communication.
16 _COMM_PORT = 32890
17 # TCP Buffer Size.
18 _BUFFER = 4096
19 # Timeout between checks for new status by either end.
20 _HEARTBEAT_TIMEOUT = 60 # in sec.
21 # Max Timeout to wait before assuming failure.
22 _MAX_TIMEOUT = 30 * 60 # in sec.
23
24 # Commands - sent to slave from master.
25
26 # Report whether you have completed or failed building.
27 _COMMAND_CHECK_STATUS = 'check-status'
28
29 # Return status - response to commands from slaves (self.explanatory)
30 _STATUS_COMMAND_REJECTED = 'rejected'
31 _STATUS_TIMEOUT = 'timeout'
32 # Public for cbuildbot.
33 STATUS_BUILD_COMPLETE = 'complete'
34 STATUS_BUILD_FAILED = 'failure'
35
36 # Global queues to communicate with server.
37 _status_queue = Queue.Queue(1)
38 _receive_queue = Queue.Queue(1)
39 _command_queue = Queue.Queue(1)
40
41 class _TCPServerWithReuse(SocketServer.TCPServer):
42 """TCPServer that allows re-use of socket and timed out sockets."""
43 SocketServer.TCPServer.allow_reuse_address = True
44
45 def __init__(self, address, handler, timeout):
46 SocketServer.TCPServer.__init__(self, address, handler)
47 self.socket.settimeout(timeout)
48
49
50 class _SlaveCommandHandler(SocketServer.BaseRequestHandler):
51 """Handles requests from a master pre-flight-queue bot."""
52
53 def _HandleCommand(self, command, args):
54 """Handles command and returns status for master."""
55 print >> sys.stderr, ('(Slave) - Received command %s with args %s' %
56 (command, args))
57 command_to_expect = _command_queue.get()
58 # Check status also adds an entry on the status queue.
59 if command_to_expect == _COMMAND_CHECK_STATUS:
60 slave_status = _status_queue.get()
61 # Safety check to make sure the server is in a good state.
62 if command_to_expect != command:
63 print >> sys.stderr, (
64 '(Slave) - Rejecting command %s. Was expecting %s.' % (command,
65 command_to_expect))
66 return _STATUS_COMMAND_REJECTED
67 # Give slave command with optional args.
68 _receive_queue.put(args)
69 if command == _COMMAND_CHECK_STATUS:
70 # Returns status to send.
71 return slave_status
72
73 def handle(self):
74 """Overriden. Handles commands sent from master."""
75 data = self.request.recv(_BUFFER).strip()
76 (command, args) = data.split('\n')
77 response = self._HandleCommand(command, args)
78 self.request.send(response)
79
80
81 def _GetSlaveNames(configuration):
82 """Returns an array of slave hostnames that are important."""
83 slaves = []
84 for slave_config in configuration.items():
85 if (not slave_config[1]['master'] and
86 slave_config[1]['important']):
87 slaves.append(slave_config[1]['hostname'])
88 return slaves
89
90
91 def _SendCommand(hostname, command, args):
92 """Returns response from host or _STATUS_TIMEOUT on error."""
93 data = '%s\n%s\n' % (command, args)
94 print '(Master) - Sending %s %s to %s' % (command, args, hostname)
95
96 # Create a socket (SOCK_STREAM means a TCP socket).
97 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
98
99 try:
100 # Connect to server and send data
101 sock.connect((hostname, _COMM_PORT))
102 sock.send(data)
103
104 # Receive data from the server and shut down.
105 received = sock.recv(_BUFFER)
106 except:
107 received = _STATUS_TIMEOUT
108 finally:
109 sock.close()
110 return received
111
112
113 def _CheckSlavesLeftStatus(slaves_to_check):
114 """Returns True if remaining slaves have completed.
115
116 Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns
117 True as long as no slave reports STATUS_BUILD_FAILED.
118
119 Keyword arguments:
120 slaves_to_check -- Array of hostnames to check.
121
122 """
123 slaves_to_remove = []
124 for slave in slaves_to_check:
125 status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty')
126 if status == STATUS_BUILD_FAILED:
127 print >> sys.stderr, '(Master) - Slave %s failed' % slave
128 return False
129 elif status == STATUS_BUILD_COMPLETE:
130 print >> sys.stderr, '(Master) - Slave %s completed' % slave
131 slaves_to_remove.append(slave)
132 for slave in slaves_to_remove:
133 slaves_to_check.remove(slave)
134 return True
135
136
137 def HaveSlavesCompleted(configuration):
138 """Returns True if all other slaves have succeeded.
139
140 Checks other slaves status until either '_MAX_TIMEOUT' has passed,
141 at least one slaves reports a failure, or all slaves report success.
142
143 Keyword arguments:
144 configuration -- configuration dictionary for slaves.
145
146 """
147 not_failed = True
148 slaves_to_check = _GetSlaveNames(configuration)
149 timeout = 0
150 while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT:
151 not_failed = _CheckSlavesLeftStatus(slaves_to_check)
152 if slaves_to_check and not_failed:
153 time.sleep(_HEARTBEAT_TIMEOUT)
154 timeout += _HEARTBEAT_TIMEOUT
155 return len(slaves_to_check) == 0
156
157
158 def PublishStatus(status):
159 """Publishes status and Returns True if master received it.
160
161 This call is blocking until either the master pre-flight-queue bot picks
162 up the status, or a '_MAX_TIMEOUT' has passed.
163
164 Keyword arguments:
165 status -- should be a string and one of STATUS_BUILD_.*.
166
167 """
168 # Clean up queues.
169 try:
170 _command_queue.get_nowait()
171 except Queue.Empty: pass
172 try:
173 _status_queue.get_nowait()
174 except Queue.Empty: pass
175
176 _command_queue.put(_COMMAND_CHECK_STATUS)
177 _status_queue.put(status)
178 server = _TCPServerWithReuse(('localhost', _COMM_PORT),
179 _SlaveCommandHandler, _HEARTBEAT_TIMEOUT)
180 timeout = 0
181 response = None
182 try:
183 while not response and timeout < _MAX_TIMEOUT:
184 server.handle_request()
185 try:
186 response = _receive_queue.get_nowait()
187 except Queue.Empty:
188 print >> sys.stderr, ('(Slave) - Waiting for master to accept %s' % (
189 status))
190 timeout += _HEARTBEAT_TIMEOUT
191 response = None
192 except Exception, e:
193 print >> sys.stderr, '%s' % e
194 server.server_close()
195 return response != None
OLDNEW
« no previous file with comments | « bin/cbuildbot.py ('k') | bin/cbuildbot_comm_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698