OLD | NEW |
1 import sys, socket, errno, logging | 1 import sys, socket, errno, logging |
2 from time import time, sleep | 2 from time import time, sleep |
3 from autotest_lib.client.common_lib import error | 3 from autotest_lib.client.common_lib import error |
4 | 4 |
5 # default barrier port | 5 # default barrier port |
6 _DEFAULT_PORT = 11922 | 6 _DEFAULT_PORT = 11922 |
7 | 7 |
8 def get_host_from_id(hostid): | |
9 # Remove any trailing local identifier following a #. | |
10 # This allows multiple members per host which is particularly | |
11 # helpful in testing. | |
12 if not hostid.startswith('#'): | |
13 return hostid.split('#')[0] | |
14 else: | |
15 raise error.BarrierError( | |
16 "Invalid Host id: Host Address should be specified") | |
17 | |
18 | 8 |
19 class BarrierAbortError(error.BarrierError): | 9 class BarrierAbortError(error.BarrierError): |
20 """Special BarrierError raised when an explicit abort is requested.""" | 10 """Special BarrierError raised when an explicit abort is requested.""" |
21 | 11 |
22 | 12 |
23 class listen_server(object): | 13 class listen_server(object): |
24 """ | 14 """ |
25 Manages a listening socket for barrier. | 15 Manages a listening socket for barrier. |
26 | 16 |
27 Can be used to run multiple barrier instances with the same listening | 17 Can be used to run multiple barrier instances with the same listening |
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
162 logging.info("tag=%s port=%d timeout=%r", | 152 logging.info("tag=%s port=%d timeout=%r", |
163 self._tag, self._port, self._timeout_secs) | 153 self._tag, self._port, self._timeout_secs) |
164 | 154 |
165 # Number of clients seen (should be the length of self._waiting). | 155 # Number of clients seen (should be the length of self._waiting). |
166 self._seen = 0 | 156 self._seen = 0 |
167 | 157 |
168 # Clients who have checked in and are waiting (if we are a master). | 158 # Clients who have checked in and are waiting (if we are a master). |
169 self._waiting = {} # Maps from hostname -> (client, addr) tuples. | 159 self._waiting = {} # Maps from hostname -> (client, addr) tuples. |
170 | 160 |
171 | 161 |
| 162 def _get_host_from_id(self, hostid): |
| 163 # Remove any trailing local identifier following a #. |
| 164 # This allows multiple members per host which is particularly |
| 165 # helpful in testing. |
| 166 if not hostid.startswith('#'): |
| 167 return hostid.split('#')[0] |
| 168 else: |
| 169 raise error.BarrierError( |
| 170 "Invalid Host id: Host Address should be specified") |
| 171 |
| 172 |
172 def _update_timeout(self, timeout): | 173 def _update_timeout(self, timeout): |
173 if timeout is not None and self._start_time is not None: | 174 if timeout is not None and self._start_time is not None: |
174 self._timeout_secs = (time() - self._start_time) + timeout | 175 self._timeout_secs = (time() - self._start_time) + timeout |
175 else: | 176 else: |
176 self._timeout_secs = timeout | 177 self._timeout_secs = timeout |
177 | 178 |
178 | 179 |
179 def _remaining(self): | 180 def _remaining(self): |
180 if self._timeout_secs is not None and self._start_time is not None: | 181 if self._timeout_secs is not None and self._start_time is not None: |
181 timeout = self._timeout_secs - (time() - self._start_time) | 182 timeout = self._timeout_secs - (time() - self._start_time) |
(...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
389 | 390 |
390 | 391 |
391 def _run_client(self, is_master): | 392 def _run_client(self, is_master): |
392 while self._remaining() is None or self._remaining() > 0: | 393 while self._remaining() is None or self._remaining() > 0: |
393 try: | 394 try: |
394 remote = socket.socket(socket.AF_INET, | 395 remote = socket.socket(socket.AF_INET, |
395 socket.SOCK_STREAM) | 396 socket.SOCK_STREAM) |
396 remote.settimeout(30) | 397 remote.settimeout(30) |
397 if is_master: | 398 if is_master: |
398 # Connect to all slaves. | 399 # Connect to all slaves. |
399 host = get_host_from_id(self._members[self._seen]) | 400 host = self._get_host_from_id(self._members[self._seen]) |
400 logging.info("calling slave: %s", host) | 401 logging.info("calling slave: %s", host) |
401 connection = (remote, (host, self._port)) | 402 connection = (remote, (host, self._port)) |
402 remote.connect(connection[1]) | 403 remote.connect(connection[1]) |
403 self._master_welcome(connection) | 404 self._master_welcome(connection) |
404 else: | 405 else: |
405 # Just connect to the master. | 406 # Just connect to the master. |
406 host = get_host_from_id(self._masterid) | 407 host = self._get_host_from_id(self._masterid) |
407 logging.info("calling master") | 408 logging.info("calling master") |
408 connection = (remote, (host, self._port)) | 409 connection = (remote, (host, self._port)) |
409 remote.connect(connection[1]) | 410 remote.connect(connection[1]) |
410 self._slave_hello(connection) | 411 self._slave_hello(connection) |
411 except socket.timeout: | 412 except socket.timeout: |
412 logging.warn("timeout calling host, retry") | 413 logging.warn("timeout calling host, retry") |
413 sleep(10) | 414 sleep(10) |
414 pass | 415 pass |
415 except socket.error, err: | 416 except socket.error, err: |
416 (code, str) = err | 417 (code, str) = err |
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
533 self._seen = 0 | 534 self._seen = 0 |
534 self._waiting = {} | 535 self._waiting = {} |
535 | 536 |
536 # Figure out who is the master in this barrier. | 537 # Figure out who is the master in this barrier. |
537 if self._hostid == self._masterid: | 538 if self._hostid == self._masterid: |
538 logging.info("selected as master") | 539 logging.info("selected as master") |
539 self._run_client(is_master=True) | 540 self._run_client(is_master=True) |
540 else: | 541 else: |
541 logging.info("selected as slave") | 542 logging.info("selected as slave") |
542 self._run_server(is_master=False) | 543 self._run_server(is_master=False) |
OLD | NEW |