OLD | NEW |
1 import sys, socket, errno, logging | 1 import sys, socket, errno, logging |
2 from time import time, sleep | 2 from time import time, sleep |
3 from autotest_lib.client.common_lib import error | 3 from autotest_lib.client.common_lib import error |
4 | 4 |
5 # default barrier port | 5 # default barrier port |
6 _DEFAULT_PORT = 11922 | 6 _DEFAULT_PORT = 11922 |
7 | 7 |
| 8 def get_host_from_id(hostid): |
| 9 # Remove any trailing local identifier following a #. |
| 10 # This allows multiple members per host which is particularly |
| 11 # helpful in testing. |
| 12 if not hostid.startswith('#'): |
| 13 return hostid.split('#')[0] |
| 14 else: |
| 15 raise error.BarrierError( |
| 16 "Invalid Host id: Host Address should be specified") |
| 17 |
8 | 18 |
9 class BarrierAbortError(error.BarrierError): | 19 class BarrierAbortError(error.BarrierError): |
10 """Special BarrierError raised when an explicit abort is requested.""" | 20 """Special BarrierError raised when an explicit abort is requested.""" |
11 | 21 |
12 | 22 |
13 class listen_server(object): | 23 class listen_server(object): |
14 """ | 24 """ |
15 Manages a listening socket for barrier. | 25 Manages a listening socket for barrier. |
16 | 26 |
17 Can be used to run multiple barrier instances with the same listening | 27 Can be used to run multiple barrier instances with the same listening |
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
152 logging.info("tag=%s port=%d timeout=%r", | 162 logging.info("tag=%s port=%d timeout=%r", |
153 self._tag, self._port, self._timeout_secs) | 163 self._tag, self._port, self._timeout_secs) |
154 | 164 |
155 # Number of clients seen (should be the length of self._waiting). | 165 # Number of clients seen (should be the length of self._waiting). |
156 self._seen = 0 | 166 self._seen = 0 |
157 | 167 |
158 # Clients who have checked in and are waiting (if we are a master). | 168 # Clients who have checked in and are waiting (if we are a master). |
159 self._waiting = {} # Maps from hostname -> (client, addr) tuples. | 169 self._waiting = {} # Maps from hostname -> (client, addr) tuples. |
160 | 170 |
161 | 171 |
162 def _get_host_from_id(self, hostid): | |
163 # Remove any trailing local identifier following a #. | |
164 # This allows multiple members per host which is particularly | |
165 # helpful in testing. | |
166 if not hostid.startswith('#'): | |
167 return hostid.split('#')[0] | |
168 else: | |
169 raise error.BarrierError( | |
170 "Invalid Host id: Host Address should be specified") | |
171 | |
172 | |
173 def _update_timeout(self, timeout): | 172 def _update_timeout(self, timeout): |
174 if timeout is not None and self._start_time is not None: | 173 if timeout is not None and self._start_time is not None: |
175 self._timeout_secs = (time() - self._start_time) + timeout | 174 self._timeout_secs = (time() - self._start_time) + timeout |
176 else: | 175 else: |
177 self._timeout_secs = timeout | 176 self._timeout_secs = timeout |
178 | 177 |
179 | 178 |
180 def _remaining(self): | 179 def _remaining(self): |
181 if self._timeout_secs is not None and self._start_time is not None: | 180 if self._timeout_secs is not None and self._start_time is not None: |
182 timeout = self._timeout_secs - (time() - self._start_time) | 181 timeout = self._timeout_secs - (time() - self._start_time) |
(...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
390 | 389 |
391 | 390 |
392 def _run_client(self, is_master): | 391 def _run_client(self, is_master): |
393 while self._remaining() is None or self._remaining() > 0: | 392 while self._remaining() is None or self._remaining() > 0: |
394 try: | 393 try: |
395 remote = socket.socket(socket.AF_INET, | 394 remote = socket.socket(socket.AF_INET, |
396 socket.SOCK_STREAM) | 395 socket.SOCK_STREAM) |
397 remote.settimeout(30) | 396 remote.settimeout(30) |
398 if is_master: | 397 if is_master: |
399 # Connect to all slaves. | 398 # Connect to all slaves. |
400 host = self._get_host_from_id(self._members[self._seen]) | 399 host = get_host_from_id(self._members[self._seen]) |
401 logging.info("calling slave: %s", host) | 400 logging.info("calling slave: %s", host) |
402 connection = (remote, (host, self._port)) | 401 connection = (remote, (host, self._port)) |
403 remote.connect(connection[1]) | 402 remote.connect(connection[1]) |
404 self._master_welcome(connection) | 403 self._master_welcome(connection) |
405 else: | 404 else: |
406 # Just connect to the master. | 405 # Just connect to the master. |
407 host = self._get_host_from_id(self._masterid) | 406 host = get_host_from_id(self._masterid) |
408 logging.info("calling master") | 407 logging.info("calling master") |
409 connection = (remote, (host, self._port)) | 408 connection = (remote, (host, self._port)) |
410 remote.connect(connection[1]) | 409 remote.connect(connection[1]) |
411 self._slave_hello(connection) | 410 self._slave_hello(connection) |
412 except socket.timeout: | 411 except socket.timeout: |
413 logging.warn("timeout calling host, retry") | 412 logging.warn("timeout calling host, retry") |
414 sleep(10) | 413 sleep(10) |
415 pass | 414 pass |
416 except socket.error, err: | 415 except socket.error, err: |
417 (code, str) = err | 416 (code, str) = err |
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
534 self._seen = 0 | 533 self._seen = 0 |
535 self._waiting = {} | 534 self._waiting = {} |
536 | 535 |
537 # Figure out who is the master in this barrier. | 536 # Figure out who is the master in this barrier. |
538 if self._hostid == self._masterid: | 537 if self._hostid == self._masterid: |
539 logging.info("selected as master") | 538 logging.info("selected as master") |
540 self._run_client(is_master=True) | 539 self._run_client(is_master=True) |
541 else: | 540 else: |
542 logging.info("selected as slave") | 541 logging.info("selected as slave") |
543 self._run_server(is_master=False) | 542 self._run_server(is_master=False) |
OLD | NEW |