| Index: appengine/swarming/swarming_bot/bot_code/file_reader_test.py
|
| diff --git a/appengine/swarming/swarming_bot/bot_code/file_reader_test.py b/appengine/swarming/swarming_bot/bot_code/file_reader_test.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..f6d9a5489fe2da825a7e0851b888c7c98a6b1df4
|
| --- /dev/null
|
| +++ b/appengine/swarming/swarming_bot/bot_code/file_reader_test.py
|
| @@ -0,0 +1,72 @@
|
| +#!/usr/bin/env python
|
| +# Copyright 2016 The LUCI Authors. All rights reserved.
|
| +# Use of this source code is governed under the Apache License, Version 2.0
|
| +# that can be found in the LICENSE file.
|
| +
|
| +import json
|
| +import logging
|
| +import os
|
| +import sys
|
| +import tempfile
|
| +import time
|
| +import unittest
|
| +
|
| +import test_env_bot_code
|
| +test_env_bot_code.setup_test_env()
|
| +
|
| +from depot_tools import auto_stub
|
| +from depot_tools import fix_encoding
|
| +from utils import file_path
|
| +
|
| +import file_reader
|
| +
|
| +
|
| +class TestFileReaderThread(auto_stub.TestCase):
|
| + def setUp(self):
|
| + super(TestFileReaderThread, self).setUp()
|
| + self.root_dir = tempfile.mkdtemp(prefix='file_reader')
|
| + self.path = os.path.join(self.root_dir, 'target_file')
|
| +
|
| + def tearDown(self):
|
| + file_path.rmtree(self.root_dir)
|
| + super(TestFileReaderThread, self).tearDown()
|
| +
|
| + def test_works(self):
|
| + with open(self.path, 'wb') as f:
|
| + json.dump({'A': 'a'}, f)
|
| +
|
| + r = file_reader.FileReaderThread(self.path, 0.1)
|
| + r.start()
|
| + self.assertEqual(r.last_value, {'A': 'a'})
|
| +
|
| + # Change the file. Expect possible conflict on Windows.
|
| + attempt = 0
|
| + while True:
|
| + try:
|
| + file_path.atomic_replace(self.path, json.dumps({'B': 'b'}))
|
| + break
|
| + except OSError:
|
| + attempt += 1
|
| + if attempt == 20:
|
| + self.fail('Cannot replace the file, giving up')
|
| + time.sleep(0.05)
|
| +
|
| + # Give some reasonable time for the reader thread to pick up the change.
|
| + # This test will flake if for whatever reason OS thread scheduler is lagging
|
| + # for more than 2 seconds.
|
| + time.sleep(2)
|
| +
|
| + self.assertEqual(r.last_value, {'B': 'b'})
|
| +
|
| + def test_start_throws_on_error(self):
|
| + r = file_reader.FileReaderThread(
|
| + self.path + "_no_such_file", max_attempts=2)
|
| + with self.assertRaises(file_reader.FatalReadError):
|
| + r.start()
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + fix_encoding.fix_encoding()
|
| + logging.basicConfig(
|
| + level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL)
|
| + unittest.main()
|
|
|