| OLD | NEW |
| 1 import sys, socket, errno, logging | 1 import sys, socket, errno, logging |
| 2 from time import time, sleep | 2 from time import time, sleep |
| 3 from autotest_lib.client.common_lib import error | 3 from autotest_lib.client.common_lib import error |
| 4 | 4 |
| 5 # default barrier port | 5 # default barrier port |
| 6 _DEFAULT_PORT = 11922 | 6 _DEFAULT_PORT = 11922 |
| 7 | 7 |
| 8 def get_host_from_id(hostid): |
| 9 # Remove any trailing local identifier following a #. |
| 10 # This allows multiple members per host which is particularly |
| 11 # helpful in testing. |
| 12 if not hostid.startswith('#'): |
| 13 return hostid.split('#')[0] |
| 14 else: |
| 15 raise error.BarrierError( |
| 16 "Invalid Host id: Host Address should be specified") |
| 17 |
| 8 | 18 |
| 9 class BarrierAbortError(error.BarrierError): | 19 class BarrierAbortError(error.BarrierError): |
| 10 """Special BarrierError raised when an explicit abort is requested.""" | 20 """Special BarrierError raised when an explicit abort is requested.""" |
| 11 | 21 |
| 12 | 22 |
| 13 class listen_server(object): | 23 class listen_server(object): |
| 14 """ | 24 """ |
| 15 Manages a listening socket for barrier. | 25 Manages a listening socket for barrier. |
| 16 | 26 |
| 17 Can be used to run multiple barrier instances with the same listening | 27 Can be used to run multiple barrier instances with the same listening |
| (...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 152 logging.info("tag=%s port=%d timeout=%r", | 162 logging.info("tag=%s port=%d timeout=%r", |
| 153 self._tag, self._port, self._timeout_secs) | 163 self._tag, self._port, self._timeout_secs) |
| 154 | 164 |
| 155 # Number of clients seen (should be the length of self._waiting). | 165 # Number of clients seen (should be the length of self._waiting). |
| 156 self._seen = 0 | 166 self._seen = 0 |
| 157 | 167 |
| 158 # Clients who have checked in and are waiting (if we are a master). | 168 # Clients who have checked in and are waiting (if we are a master). |
| 159 self._waiting = {} # Maps from hostname -> (client, addr) tuples. | 169 self._waiting = {} # Maps from hostname -> (client, addr) tuples. |
| 160 | 170 |
| 161 | 171 |
| 162 def _get_host_from_id(self, hostid): | |
| 163 # Remove any trailing local identifier following a #. | |
| 164 # This allows multiple members per host which is particularly | |
| 165 # helpful in testing. | |
| 166 if not hostid.startswith('#'): | |
| 167 return hostid.split('#')[0] | |
| 168 else: | |
| 169 raise error.BarrierError( | |
| 170 "Invalid Host id: Host Address should be specified") | |
| 171 | |
| 172 | |
| 173 def _update_timeout(self, timeout): | 172 def _update_timeout(self, timeout): |
| 174 if timeout is not None and self._start_time is not None: | 173 if timeout is not None and self._start_time is not None: |
| 175 self._timeout_secs = (time() - self._start_time) + timeout | 174 self._timeout_secs = (time() - self._start_time) + timeout |
| 176 else: | 175 else: |
| 177 self._timeout_secs = timeout | 176 self._timeout_secs = timeout |
| 178 | 177 |
| 179 | 178 |
| 180 def _remaining(self): | 179 def _remaining(self): |
| 181 if self._timeout_secs is not None and self._start_time is not None: | 180 if self._timeout_secs is not None and self._start_time is not None: |
| 182 timeout = self._timeout_secs - (time() - self._start_time) | 181 timeout = self._timeout_secs - (time() - self._start_time) |
| (...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 390 | 389 |
| 391 | 390 |
| 392 def _run_client(self, is_master): | 391 def _run_client(self, is_master): |
| 393 while self._remaining() is None or self._remaining() > 0: | 392 while self._remaining() is None or self._remaining() > 0: |
| 394 try: | 393 try: |
| 395 remote = socket.socket(socket.AF_INET, | 394 remote = socket.socket(socket.AF_INET, |
| 396 socket.SOCK_STREAM) | 395 socket.SOCK_STREAM) |
| 397 remote.settimeout(30) | 396 remote.settimeout(30) |
| 398 if is_master: | 397 if is_master: |
| 399 # Connect to all slaves. | 398 # Connect to all slaves. |
| 400 host = self._get_host_from_id(self._members[self._seen]) | 399 host = get_host_from_id(self._members[self._seen]) |
| 401 logging.info("calling slave: %s", host) | 400 logging.info("calling slave: %s", host) |
| 402 connection = (remote, (host, self._port)) | 401 connection = (remote, (host, self._port)) |
| 403 remote.connect(connection[1]) | 402 remote.connect(connection[1]) |
| 404 self._master_welcome(connection) | 403 self._master_welcome(connection) |
| 405 else: | 404 else: |
| 406 # Just connect to the master. | 405 # Just connect to the master. |
| 407 host = self._get_host_from_id(self._masterid) | 406 host = get_host_from_id(self._masterid) |
| 408 logging.info("calling master") | 407 logging.info("calling master") |
| 409 connection = (remote, (host, self._port)) | 408 connection = (remote, (host, self._port)) |
| 410 remote.connect(connection[1]) | 409 remote.connect(connection[1]) |
| 411 self._slave_hello(connection) | 410 self._slave_hello(connection) |
| 412 except socket.timeout: | 411 except socket.timeout: |
| 413 logging.warn("timeout calling host, retry") | 412 logging.warn("timeout calling host, retry") |
| 414 sleep(10) | 413 sleep(10) |
| 415 pass | 414 pass |
| 416 except socket.error, err: | 415 except socket.error, err: |
| 417 (code, str) = err | 416 (code, str) = err |
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 534 self._seen = 0 | 533 self._seen = 0 |
| 535 self._waiting = {} | 534 self._waiting = {} |
| 536 | 535 |
| 537 # Figure out who is the master in this barrier. | 536 # Figure out who is the master in this barrier. |
| 538 if self._hostid == self._masterid: | 537 if self._hostid == self._masterid: |
| 539 logging.info("selected as master") | 538 logging.info("selected as master") |
| 540 self._run_client(is_master=True) | 539 self._run_client(is_master=True) |
| 541 else: | 540 else: |
| 542 logging.info("selected as slave") | 541 logging.info("selected as slave") |
| 543 self._run_server(is_master=False) | 542 self._run_server(is_master=False) |
| OLD | NEW |