Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(668)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/file_reader_test.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2016 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file.
5
6 import json
7 import logging
8 import os
9 import sys
10 import tempfile
11 import time
12 import unittest
13
14 import test_env_bot_code
15 test_env_bot_code.setup_test_env()
16
17 from depot_tools import auto_stub
18 from depot_tools import fix_encoding
19 from utils import file_path
20
21 import file_reader
22
23
24 class TestFileReaderThread(auto_stub.TestCase):
25 def setUp(self):
26 super(TestFileReaderThread, self).setUp()
27 self.root_dir = tempfile.mkdtemp(prefix='file_reader')
28 self.path = os.path.join(self.root_dir, 'target_file')
29
30 def tearDown(self):
31 file_path.rmtree(self.root_dir)
32 super(TestFileReaderThread, self).tearDown()
33
34 def test_works(self):
35 with open(self.path, 'wb') as f:
36 json.dump({'A': 'a'}, f)
37
38 r = file_reader.FileReaderThread(self.path, 0.1)
39 r.start()
40 self.assertEqual(r.last_value, {'A': 'a'})
41
42 # Change the file. Expect possible conflict on Windows.
43 attempt = 0
44 while True:
45 try:
46 file_path.atomic_replace(self.path, json.dumps({'B': 'b'}))
47 break
48 except OSError:
49 attempt += 1
50 if attempt == 20:
51 self.fail('Cannot replace the file, giving up')
52 time.sleep(0.05)
53
54 # Give some reasonable time for the reader thread to pick up the change.
55 # This test will flake if for whatever reason OS thread scheduler is lagging
56 # for more than 2 seconds.
57 time.sleep(2)
58
59 self.assertEqual(r.last_value, {'B': 'b'})
60
61 def test_start_throws_on_error(self):
62 r = file_reader.FileReaderThread(
63 self.path + "_no_such_file", max_attempts=2)
64 with self.assertRaises(file_reader.FatalReadError):
65 r.start()
66
67
68 if __name__ == '__main__':
69 fix_encoding.fix_encoding()
70 logging.basicConfig(
71 level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL)
72 unittest.main()
OLDNEW
« no previous file with comments | « appengine/swarming/swarming_bot/bot_code/file_reader.py ('k') | appengine/swarming/swarming_bot/bot_code/file_refresher.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698