Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Side by Side Diff: third_party/gsutil/gslib/parallelism_framework_util.py

Issue 1380943003: Roll version of gsutil to 4.15. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved. 2 # Copyright 2013 Google Inc. All Rights Reserved.
3 # 3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License. 5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at 6 # You may obtain a copy of the License at
7 # 7 #
8 # http://www.apache.org/licenses/LICENSE-2.0 8 # http://www.apache.org/licenses/LICENSE-2.0
9 # 9 #
10 # Unless required by applicable law or agreed to in writing, software 10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, 11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and 13 # See the License for the specific language governing permissions and
14 # limitations under the License. 14 # limitations under the License.
15 """Utility classes for the parallelism framework.""" 15 """Utility classes for the parallelism framework."""
16 16
17 from __future__ import absolute_import 17 from __future__ import absolute_import
18 18
19 import multiprocessing
20 import threading 19 import threading
21 20
22 21
23 class BasicIncrementDict(object): 22 class AtomicDict(object):
24 """Dictionary meant for storing values for which increment is defined. 23 """Thread-safe (and optionally process-safe) dictionary protected by a lock.
25 24
26 This handles any values for which the "+" operation is defined (e.g., floats, 25 If a multiprocessing.Manager is supplied on init, the dictionary is
27 lists, etc.). This class is neither thread- nor process-safe. 26 both process and thread safe. Otherwise, it is only thread-safe.
28 """ 27 """
29 28
30 def __init__(self): 29 def __init__(self, manager=None):
31 self.dict = {} 30 """Initializes the dict.
32
33 def Get(self, key, default_value=None):
34 return self.dict.get(key, default_value)
35
36 def Put(self, key, value):
37 self.dict[key] = value
38
39 def Update(self, key, inc, default_value=0):
40 """Update the stored value associated with the given key.
41
42 Performs the equivalent of
43 self.put(key, self.get(key, default_value) + inc).
44 31
45 Args: 32 Args:
46 key: lookup key for the value of the first operand of the "+" operation. 33 manager: multiprocessing.Manager instance (required for process safety).
47 inc: Second operand of the "+" operation.
48 default_value: Default value if there is no existing value for the key.
49
50 Returns:
51 Incremented value.
52 """ 34 """
53 val = self.dict.get(key, default_value) + inc 35 if manager:
54 self.dict[key] = val 36 self.lock = manager.Lock()
55 return val 37 self.dict = manager.dict()
56 38 else:
57 39 self.lock = threading.Lock()
58 class AtomicIncrementDict(BasicIncrementDict): 40 self.dict = {}
59 """Dictionary meant for storing values for which increment is defined.
60
61 This handles any values for which the "+" operation is defined (e.g., floats,
62 lists, etc.) in a thread- and process-safe way that allows for atomic get,
63 put, and update.
64 """
65
66 def __init__(self, manager): # pylint: disable=super-init-not-called
67 self.dict = ThreadAndProcessSafeDict(manager)
68 self.lock = multiprocessing.Lock()
69
70 def Update(self, key, inc, default_value=0):
71 """Atomically update the stored value associated with the given key.
72
73 Performs the atomic equivalent of
74 self.put(key, self.get(key, default_value) + inc).
75
76 Args:
77 key: lookup key for the value of the first operand of the "+" operation.
78 inc: Second operand of the "+" operation.
79 default_value: Default value if there is no existing value for the key.
80
81 Returns:
82 Incremented value.
83 """
84 with self.lock:
85 return super(AtomicIncrementDict, self).Update(key, inc, default_value)
86
87
88 class ThreadSafeDict(object):
89 """Provides a thread-safe dictionary (protected by a lock)."""
90
91 def __init__(self):
92 """Initializes the thread-safe dict."""
93 self.lock = threading.Lock()
94 self.dict = {}
95 41
96 def __getitem__(self, key): 42 def __getitem__(self, key):
97 with self.lock: 43 with self.lock:
98 return self.dict[key] 44 return self.dict[key]
99 45
100 def __setitem__(self, key, value): 46 def __setitem__(self, key, value):
101 with self.lock: 47 with self.lock:
102 self.dict[key] = value 48 self.dict[key] = value
103 49
104 # pylint: disable=invalid-name 50 # pylint: disable=invalid-name
105 def get(self, key, default_value=None): 51 def get(self, key, default_value=None):
106 with self.lock: 52 with self.lock:
107 return self.dict.get(key, default_value) 53 return self.dict.get(key, default_value)
108 54
109 def delete(self, key): 55 def delete(self, key):
110 with self.lock: 56 with self.lock:
111 del self.dict[key] 57 del self.dict[key]
112 58
59 def Increment(self, key, inc, default_value=0):
60 """Atomically updates the stored value associated with the given key.
113 61
114 class ThreadAndProcessSafeDict(ThreadSafeDict): 62 Performs the atomic equivalent of
115 """Wraps a multiprocessing.Manager's proxy objects for thread-safety. 63 dict[key] = dict.get(key, default_value) + inc.
116
117 The proxy objects returned by a manager are process-safe but not necessarily
118 thread-safe, so this class simply wraps their access with a lock for ease of
119 use. Since the objects are process-safe, we can use the more efficient
120 threading Lock.
121 """
122
123 def __init__(self, manager):
124 """Initializes the thread and process safe dict.
125 64
126 Args: 65 Args:
127 manager: Multiprocessing.manager object. 66 key: lookup key for the value of the first operand of the "+" operation.
67 inc: Second operand of the "+" operation.
68 default_value: Default value if there is no existing value for the key.
69
70 Returns:
71 Incremented value.
128 """ 72 """
129 super(ThreadAndProcessSafeDict, self).__init__() 73 with self.lock:
130 self.dict = manager.dict() 74 val = self.dict.get(key, default_value) + inc
75 self.dict[key] = val
76 return val
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698