Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: bin/cbuildbot_comm.py

Issue 6286040: Remove buildbot code that has been added to chromite. (Closed) Base URL: http://git.chromium.org/git/crosutils.git@master
Patch Set: Verify Created 9 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « bin/cbuildbot.py ('k') | bin/cbuildbot_comm_unittest.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Module contains communication methods between cbuildbot instances."""
6
7 import Queue
8 import SocketServer
9 import os
10 import socket
11 import sys
12 import time
13
14 sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
15 from cros_build_lib import Info, Warning, RunCommand
16
17 # Communication port for master to slave communication.
18 _COMM_PORT = 32890
19 # TCP Buffer Size.
20 _BUFFER = 4096
21 # Timeout between checks for new status by either end.
22 _HEARTBEAT_TIMEOUT = 60 # in sec.
23 # Max Timeout to wait before assuming failure.
24 _MAX_TIMEOUT = 30 * 60 # in sec.
25
26 # Commands - sent to slave from master.
27
28 # Report whether you have completed or failed building.
29 _COMMAND_CHECK_STATUS = 'check-status'
30
31 # Return status - response to commands from slaves (self.explanatory)
32 _STATUS_COMMAND_REJECTED = 'rejected'
33 _STATUS_TIMEOUT = 'timeout'
34 # Public for cbuildbot.
35 STATUS_BUILD_COMPLETE = 'complete'
36 STATUS_BUILD_FAILED = 'failure'
37
38 # Global queues to communicate with server.
39 _status_queue = Queue.Queue(1)
40 _receive_queue = Queue.Queue(1)
41 _command_queue = Queue.Queue(1)
42
43 class _TCPServerWithReuse(SocketServer.TCPServer):
44 """TCPServer that allows re-use of socket and timed out sockets."""
45 SocketServer.TCPServer.allow_reuse_address = True
46
47 def __init__(self, address, handler, timeout):
48 SocketServer.TCPServer.__init__(self, address, handler)
49 self.socket.settimeout(timeout)
50
51
52 class _SlaveCommandHandler(SocketServer.BaseRequestHandler):
53 """Handles requests from a master pre-flight-queue bot."""
54
55 def _HandleCommand(self, command, args):
56 """Handles command and returns status for master."""
57 Info('(Slave) - Received command %s with args %s' % (command, args))
58 command_to_expect = _command_queue.get()
59 # Check status also adds an entry on the status queue.
60 if command_to_expect == _COMMAND_CHECK_STATUS:
61 slave_status = _status_queue.get()
62 # Safety check to make sure the server is in a good state.
63 if command_to_expect != command:
64 Warning(
65 '(Slave) - Rejecting command %s. Was expecting %s.' % (command,
66 command_to_expect))
67 return _STATUS_COMMAND_REJECTED
68 # Give slave command with optional args.
69 _receive_queue.put(args)
70 if command == _COMMAND_CHECK_STATUS:
71 # Returns status to send.
72 return slave_status
73
74 def handle(self):
75 """Overriden. Handles commands sent from master."""
76 data = self.request.recv(_BUFFER).strip()
77 (command, args) = data.split('\n')
78 response = self._HandleCommand(command, args)
79 self.request.send(response)
80
81
82 def _GetSlaveNames(configuration):
83 """Returns an array of slave hostnames that are important."""
84 slaves = []
85 for slave_config in configuration.items():
86 if (not slave_config[1]['master'] and
87 slave_config[1]['important']):
88 slaves.append(slave_config[1]['hostname'])
89 return slaves
90
91
92 def _SendCommand(hostname, command, args):
93 """Returns response from host or _STATUS_TIMEOUT on error."""
94 data = '%s\n%s\n' % (command, args)
95 Info('(Master) - Sending %s %s to %s' % (command, args, hostname))
96
97 # Create a socket (SOCK_STREAM means a TCP socket).
98 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
99
100 try:
101 # Connect to server and send data
102 sock.connect((hostname, _COMM_PORT))
103 sock.send(data)
104
105 # Receive data from the server and shut down.
106 received = sock.recv(_BUFFER)
107 except:
108 received = _STATUS_TIMEOUT
109 finally:
110 sock.close()
111 return received
112
113
114 def _CheckSlavesLeftStatus(slaves_to_check):
115 """Returns True if remaining slaves have completed.
116
117 Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns
118 True as long as no slave reports STATUS_BUILD_FAILED.
119
120 Keyword arguments:
121 slaves_to_check -- Array of hostnames to check.
122
123 """
124 slaves_to_remove = []
125 for slave in slaves_to_check:
126 status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty')
127 if status == STATUS_BUILD_FAILED:
128 Warning('(Master) - Slave %s failed' % slave)
129 return False
130 elif status == STATUS_BUILD_COMPLETE:
131 Info('(Master) - Slave %s completed' % slave)
132 slaves_to_remove.append(slave)
133 for slave in slaves_to_remove:
134 slaves_to_check.remove(slave)
135 return True
136
137
138 def HaveSlavesCompleted(configuration):
139 """Returns True if all other slaves have succeeded.
140
141 Checks other slaves status until either '_MAX_TIMEOUT' has passed,
142 at least one slaves reports a failure, or all slaves report success.
143
144 Keyword arguments:
145 configuration -- configuration dictionary for slaves.
146
147 """
148 not_failed = True
149 slaves_to_check = _GetSlaveNames(configuration)
150 timeout = 0
151 while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT:
152 not_failed = _CheckSlavesLeftStatus(slaves_to_check)
153 if slaves_to_check and not_failed:
154 time.sleep(_HEARTBEAT_TIMEOUT)
155 timeout += _HEARTBEAT_TIMEOUT
156 return len(slaves_to_check) == 0
157
158
159 def PublishStatus(status):
160 """Publishes status and Returns True if master received it.
161
162 This call is blocking until either the master pre-flight-queue bot picks
163 up the status, or a '_MAX_TIMEOUT' has passed.
164
165 Keyword arguments:
166 status -- should be a string and one of STATUS_BUILD_.*.
167
168 """
169 # Clean up queues.
170 try:
171 _command_queue.get_nowait()
172 except Queue.Empty: pass
173 try:
174 _status_queue.get_nowait()
175 except Queue.Empty: pass
176
177 _command_queue.put(_COMMAND_CHECK_STATUS)
178 _status_queue.put(status)
179 server = _TCPServerWithReuse(('localhost', _COMM_PORT),
180 _SlaveCommandHandler, _HEARTBEAT_TIMEOUT)
181 timeout = 0
182 response = None
183 try:
184 while not response and timeout < _MAX_TIMEOUT:
185 server.handle_request()
186 try:
187 response = _receive_queue.get_nowait()
188 except Queue.Empty:
189 Info('(Slave) - Waiting for master to accept %s' % status)
190 timeout += _HEARTBEAT_TIMEOUT
191 response = None
192 except Exception, e:
193 Warning('%s' % e)
194 server.server_close()
195 return response != None
OLDNEW
« no previous file with comments | « bin/cbuildbot.py ('k') | bin/cbuildbot_comm_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698