Index: client/tests/kvm/rss_file_transfer.py |
diff --git a/client/tests/kvm/rss_file_transfer.py b/client/tests/kvm/rss_file_transfer.py |
index 3de8259cca17d8c29870ce754d96a451c1e36123..4d00d174e48753a3d7a55e1b17cb7a6d795bbac2 100755 |
--- a/client/tests/kvm/rss_file_transfer.py |
+++ b/client/tests/kvm/rss_file_transfer.py |
@@ -27,7 +27,21 @@ RSS_DONE = 9 |
class FileTransferError(Exception): |
- pass |
+ def __init__(self, msg, e=None, filename=None): |
+ Exception.__init__(self, msg, e, filename) |
+ self.msg = msg |
+ self.e = e |
+ self.filename = filename |
+ |
+ def __str__(self): |
+ s = self.msg |
+ if self.e and self.filename: |
+ s += " (error: %s, filename: %s)" % (self.e, self.filename) |
+ elif self.e: |
+ s += " (%s)" % self.e |
+ elif self.filename: |
+ s += " (filename: %s)" % self.filename |
+ return s |
class FileTransferConnectError(FileTransferError): |
@@ -42,12 +56,19 @@ class FileTransferProtocolError(FileTransferError): |
pass |
-class FileTransferSendError(FileTransferError): |
+class FileTransferSocketError(FileTransferError): |
pass |
class FileTransferServerError(FileTransferError): |
- pass |
+ def __init__(self, errmsg): |
+ FileTransferError.__init__(self, None, errmsg) |
+ |
+ def __str__(self): |
+ s = "Server said: %r" % self.e |
+ if self.filename: |
+ s += " (filename: %s)" % self.filename |
+ return s |
class FileTransferNotFoundError(FileTransferError): |
@@ -59,23 +80,24 @@ class FileTransferClient(object): |
Connect to a RSS (remote shell server) and transfer files. |
""" |
- def __init__(self, address, port, timeout=10): |
+ def __init__(self, address, port, log_func=None, timeout=20): |
""" |
Connect to a server. |
@param address: The server's address |
@param port: The server's port |
+ @param log_func: If provided, transfer stats will be passed to this |
+ function during the transfer |
@param timeout: Time duration to wait for connection to succeed |
@raise FileTransferConnectError: Raised if the connection fails |
- @raise FileTransferProtocolError: Raised if an incorrect magic number |
- is received |
""" |
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
self._socket.settimeout(timeout) |
try: |
self._socket.connect((address, port)) |
- except socket.error: |
- raise FileTransferConnectError("Could not connect to server") |
+ except socket.error, e: |
+ raise FileTransferConnectError("Cannot connect to server at " |
+ "%s:%s" % (address, port), e) |
try: |
if self._receive_msg(timeout) != RSS_MAGIC: |
raise FileTransferConnectError("Received wrong magic number") |
@@ -83,6 +105,10 @@ class FileTransferClient(object): |
raise FileTransferConnectError("Timeout expired while waiting to " |
"receive magic number") |
self._send(struct.pack("=i", CHUNKSIZE)) |
+ self._log_func = log_func |
+ self._last_time = time.time() |
+ self._last_transferred = 0 |
+ self.transferred = 0 |
def __del__(self): |
@@ -96,86 +122,116 @@ class FileTransferClient(object): |
self._socket.close() |
- def _send(self, str): |
+ def _send(self, str, timeout=60): |
try: |
+ if timeout <= 0: |
+ raise socket.timeout |
+ self._socket.settimeout(timeout) |
self._socket.sendall(str) |
- except socket.error: |
- raise FileTransferSendError("Could not send data to server") |
+ except socket.timeout: |
+ raise FileTransferTimeoutError("Timeout expired while sending " |
+ "data to server") |
+ except socket.error, e: |
+ raise FileTransferSocketError("Could not send data to server", e) |
- def _receive(self, size, timeout=10): |
+ def _receive(self, size, timeout=60): |
strs = [] |
end_time = time.time() + timeout |
- while size > 0: |
- try: |
- self._socket.settimeout(max(0.0001, end_time - time.time())) |
+ try: |
+ while size > 0: |
+ timeout = end_time - time.time() |
+ if timeout <= 0: |
+ raise socket.timeout |
+ self._socket.settimeout(timeout) |
data = self._socket.recv(size) |
- except socket.timeout: |
- raise FileTransferTimeoutError("Timeout expired while " |
- "receiving data from server") |
- except socket.error: |
- raise FileTransferProtocolError("Error receiving data from " |
- "server") |
- if not data: |
- raise FileTransferProtocolError("Connection closed " |
- "unexpectedly") |
- strs.append(data) |
- size -= len(data) |
+ if not data: |
+ raise FileTransferProtocolError("Connection closed " |
+ "unexpectedly while " |
+ "receiving data from " |
+ "server") |
+ strs.append(data) |
+ size -= len(data) |
+ except socket.timeout: |
+ raise FileTransferTimeoutError("Timeout expired while receiving " |
+ "data from server") |
+ except socket.error, e: |
+ raise FileTransferSocketError("Error receiving data from server", |
+ e) |
return "".join(strs) |
- def _send_packet(self, str): |
+ def _report_stats(self, str): |
+ if self._log_func: |
+ dt = time.time() - self._last_time |
+ if dt >= 1: |
+ transferred = self.transferred / 1048576. |
+ speed = (self.transferred - self._last_transferred) / dt |
+ speed /= 1048576. |
+ self._log_func("%s %.3f MB (%.3f MB/sec)" % |
+ (str, transferred, speed)) |
+ self._last_time = time.time() |
+ self._last_transferred = self.transferred |
+ |
+ |
+ def _send_packet(self, str, timeout=60): |
self._send(struct.pack("=I", len(str))) |
- self._send(str) |
+ self._send(str, timeout) |
+ self.transferred += len(str) + 4 |
+ self._report_stats("Sent") |
- def _receive_packet(self, timeout=10): |
+ def _receive_packet(self, timeout=60): |
size = struct.unpack("=I", self._receive(4))[0] |
- return self._receive(size, timeout) |
+ str = self._receive(size, timeout) |
+ self.transferred += len(str) + 4 |
+ self._report_stats("Received") |
+ return str |
- def _send_file_chunks(self, filename, timeout=30): |
+ def _send_file_chunks(self, filename, timeout=60): |
+ if self._log_func: |
+ self._log_func("Sending file %s" % filename) |
f = open(filename, "rb") |
try: |
- end_time = time.time() + timeout |
- while time.time() < end_time: |
- data = f.read(CHUNKSIZE) |
- self._send_packet(data) |
- if len(data) < CHUNKSIZE: |
- break |
- else: |
- raise FileTransferTimeoutError("Timeout expired while sending " |
- "file %s" % filename) |
+ try: |
+ end_time = time.time() + timeout |
+ while True: |
+ data = f.read(CHUNKSIZE) |
+ self._send_packet(data, end_time - time.time()) |
+ if len(data) < CHUNKSIZE: |
+ break |
+ except FileTransferError, e: |
+ e.filename = filename |
+ raise |
finally: |
f.close() |
- def _receive_file_chunks(self, filename, timeout=30): |
+ def _receive_file_chunks(self, filename, timeout=60): |
+ if self._log_func: |
+ self._log_func("Receiving file %s" % filename) |
f = open(filename, "wb") |
try: |
- end_time = time.time() + timeout |
- while True: |
- try: |
+ try: |
+ end_time = time.time() + timeout |
+ while True: |
data = self._receive_packet(end_time - time.time()) |
- except FileTransferTimeoutError: |
- raise FileTransferTimeoutError("Timeout expired while " |
- "receiving file %s" % |
- filename) |
- except FileTransferProtocolError: |
- raise FileTransferProtocolError("Error receiving file %s" % |
- filename) |
- f.write(data) |
- if len(data) < CHUNKSIZE: |
- break |
+ f.write(data) |
+ if len(data) < CHUNKSIZE: |
+ break |
+ except FileTransferError, e: |
+ e.filename = filename |
+ raise |
finally: |
f.close() |
- def _send_msg(self, msg, timeout=10): |
+ def _send_msg(self, msg, timeout=60): |
self._send(struct.pack("=I", msg)) |
- def _receive_msg(self, timeout=10): |
+ def _receive_msg(self, timeout=60): |
s = self._receive(4, timeout) |
return struct.unpack("=I", s)[0] |
@@ -191,7 +247,7 @@ class FileTransferClient(object): |
raise e[0], e[1], e[2] |
if msg == RSS_ERROR: |
errmsg = self._receive_packet() |
- raise FileTransferServerError("Server said: %s" % errmsg) |
+ raise FileTransferServerError(errmsg) |
raise e[0], e[1], e[2] |
@@ -200,20 +256,22 @@ class FileUploadClient(FileTransferClient): |
Connect to a RSS (remote shell server) and upload files or directory trees. |
""" |
- def __init__(self, address, port, timeout=10): |
+ def __init__(self, address, port, log_func=None, timeout=20): |
""" |
Connect to a server. |
@param address: The server's address |
@param port: The server's port |
+ @param log_func: If provided, transfer stats will be passed to this |
+ function during the transfer |
@param timeout: Time duration to wait for connection to succeed |
@raise FileTransferConnectError: Raised if the connection fails |
@raise FileTransferProtocolError: Raised if an incorrect magic number |
is received |
- @raise FileTransferSendError: Raised if the RSS_UPLOAD message cannot |
+ @raise FileTransferSocketError: Raised if the RSS_UPLOAD message cannot |
be sent to the server |
""" |
- super(FileUploadClient, self).__init__(address, port, timeout) |
+ super(FileUploadClient, self).__init__(address, port, log_func, timeout) |
self._send_msg(RSS_UPLOAD) |
@@ -221,7 +279,7 @@ class FileUploadClient(FileTransferClient): |
if os.path.isfile(path): |
self._send_msg(RSS_CREATE_FILE) |
self._send_packet(os.path.basename(path)) |
- self._send_file_chunks(path, max(0, end_time - time.time())) |
+ self._send_file_chunks(path, end_time - time.time()) |
elif os.path.isdir(path): |
self._send_msg(RSS_CREATE_DIR) |
self._send_packet(os.path.basename(path)) |
@@ -277,12 +335,12 @@ class FileUploadClient(FileTransferClient): |
"directories" % |
src_pattern) |
# Look for RSS_OK or RSS_ERROR |
- msg = self._receive_msg(max(0, end_time - time.time())) |
+ msg = self._receive_msg(end_time - time.time()) |
if msg == RSS_OK: |
return |
elif msg == RSS_ERROR: |
errmsg = self._receive_packet() |
- raise FileTransferServerError("Server said: %s" % errmsg) |
+ raise FileTransferServerError(errmsg) |
else: |
# Neither RSS_OK nor RSS_ERROR found |
raise FileTransferProtocolError("Received unexpected msg") |
@@ -297,12 +355,14 @@ class FileDownloadClient(FileTransferClient): |
Connect to a RSS (remote shell server) and download files or directory trees. |
""" |
- def __init__(self, address, port, timeout=10): |
+ def __init__(self, address, port, log_func=None, timeout=20): |
""" |
Connect to a server. |
@param address: The server's address |
@param port: The server's port |
+ @param log_func: If provided, transfer stats will be passed to this |
+ function during the transfer |
@param timeout: Time duration to wait for connection to succeed |
@raise FileTransferConnectError: Raised if the connection fails |
@raise FileTransferProtocolError: Raised if an incorrect magic number |
@@ -310,7 +370,7 @@ class FileDownloadClient(FileTransferClient): |
@raise FileTransferSendError: Raised if the RSS_UPLOAD message cannot |
be sent to the server |
""" |
- super(FileDownloadClient, self).__init__(address, port, timeout) |
+ super(FileDownloadClient, self).__init__(address, port, log_func, timeout) |
self._send_msg(RSS_DOWNLOAD) |
@@ -358,8 +418,7 @@ class FileDownloadClient(FileTransferClient): |
filename = self._receive_packet() |
if os.path.isdir(dst_path): |
dst_path = os.path.join(dst_path, filename) |
- self._receive_file_chunks( |
- dst_path, max(0, end_time - time.time())) |
+ self._receive_file_chunks(dst_path, end_time - time.time()) |
dst_path = os.path.dirname(dst_path) |
file_count += 1 |
elif msg == RSS_CREATE_DIR: |
@@ -385,7 +444,7 @@ class FileDownloadClient(FileTransferClient): |
elif msg == RSS_ERROR: |
# Receive error message and abort |
errmsg = self._receive_packet() |
- raise FileTransferServerError("Server said: %s" % errmsg) |
+ raise FileTransferServerError(errmsg) |
else: |
# Unexpected msg |
raise FileTransferProtocolError("Received unexpected msg") |
@@ -395,26 +454,26 @@ class FileDownloadClient(FileTransferClient): |
raise |
-def upload(address, port, src_pattern, dst_path, timeout=60, |
- connect_timeout=10): |
+def upload(address, port, src_pattern, dst_path, log_func=None, timeout=60, |
+ connect_timeout=20): |
""" |
Connect to server and upload files. |
@see: FileUploadClient |
""" |
- client = FileUploadClient(address, port, connect_timeout) |
+ client = FileUploadClient(address, port, log_func, connect_timeout) |
client.upload(src_pattern, dst_path, timeout) |
client.close() |
-def download(address, port, src_pattern, dst_path, timeout=60, |
- connect_timeout=10): |
+def download(address, port, src_pattern, dst_path, log_func=None, timeout=60, |
+ connect_timeout=20): |
""" |
Connect to server and upload files. |
@see: FileDownloadClient |
""" |
- client = FileDownloadClient(address, port, connect_timeout) |
+ client = FileDownloadClient(address, port, log_func, connect_timeout) |
client.download(src_pattern, dst_path, timeout) |
client.close() |
@@ -430,6 +489,9 @@ def main(): |
parser.add_option("-u", "--upload", |
action="store_true", dest="upload", |
help="upload files to server") |
+ parser.add_option("-v", "--verbose", |
+ action="store_true", dest="verbose", |
+ help="be verbose") |
parser.add_option("-t", "--timeout", |
type="int", dest="timeout", default=3600, |
help="transfer timeout") |
@@ -441,10 +503,16 @@ def main(): |
address, port, src_pattern, dst_path = args |
port = int(port) |
+ logger = None |
+ if options.verbose: |
+ def p(s): |
+ print s |
+ logger = p |
+ |
if options.download: |
- download(address, port, src_pattern, dst_path, options.timeout) |
+ download(address, port, src_pattern, dst_path, logger, options.timeout) |
elif options.upload: |
- upload(address, port, src_pattern, dst_path, options.timeout) |
+ upload(address, port, src_pattern, dst_path, logger, options.timeout) |
if __name__ == "__main__": |