Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: client/cipd/internal/instancecache.go

Issue 1870263002: cipd: instance cache (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: restore check, improve comment Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cipd/client.go ('k') | client/cipd/internal/instancecache_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package internal
6
7 import (
8 "bytes"
9 "container/heap"
10 "fmt"
11 "io"
12 "io/ioutil"
13 "math/rand"
14 "os"
15 "sync"
16 "time"
17
18 "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/proto/google"
20 "github.com/luci/luci-go/common/stringset"
21
22 "github.com/luci/luci-go/client/cipd/common"
23 "github.com/luci/luci-go/client/cipd/internal/messages"
24 "github.com/luci/luci-go/client/cipd/local"
25 )
26
27 const (
28 instanceCacheMaxSize = 300
29 // instanceCacheSyncInterval determines the frequency of
30 // synchronization of state.db with instance files in the cache dir.
31 instanceCacheSyncInterval = 8 * time.Hour
32 instanceCacheStateFilename = "state.db"
33 )
34
35 // InstanceCache is a file-system-based, thread-safe, LRU cache of instances.
36 //
37 // Does not validate instance hashes; it is caller's responsibility.
38 type InstanceCache struct {
39 fs local.FileSystem
40 stateLock sync.Mutex // synchronizes access to the state file.
41 logger logging.Logger
42 }
43
44 // NewInstanceCache initializes InstanceCache.
45 // fs will be the root of the cache.
46 func NewInstanceCache(fs local.FileSystem, logger logging.Logger) *InstanceCache {
47 if logger == nil {
48 logger = logging.Null()
49 }
50 return &InstanceCache{
51 fs: fs,
52 logger: logger,
53 }
54 }
55
56 // Get searches for the instance in the cache and writes its contents to output.
57 // If the instance is not found, returns an os.IsNotExists error without writing
58 // to output.
59 func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) err or {
60 if err := common.ValidatePin(pin); err != nil {
61 return err
62 }
63
64 path, err := c.fs.RootRelToAbs(pin.InstanceID)
65 if err != nil {
66 return fmt.Errorf("invalid instance ID %q", pin.InstanceID)
67 }
68
69 f, err := os.Open(path)
70 if err != nil {
71 return err
72 }
73 defer f.Close()
74
75 c.withState(now, func(s *messages.InstanceCache) {
76 touch(s, pin.InstanceID, now)
77 })
78
79 _, err = io.Copy(output, f)
80 return err
81 }
82
83 // Put caches an instance.
84 // write must write the instance contents.
85 // May remove some instances from the cache that were not accessed for a long ti me.
86 func (c *InstanceCache) Put(pin common.Pin, now time.Time, write func(*os.File) error) error {
87 if err := common.ValidatePin(pin); err != nil {
88 return err
89 }
90 path, err := c.fs.RootRelToAbs(pin.InstanceID)
91 if err != nil {
92 return fmt.Errorf("invalid instance ID %q", pin.InstanceID)
93 }
94
95 if err := c.fs.EnsureFile(path, write); err != nil {
96 return err
97 }
98
99 c.withState(now, func(s *messages.InstanceCache) {
100 touch(s, pin.InstanceID, now)
101 c.gc(s)
102 })
103 return nil
104 }
105
106 type timeHeap []time.Time
107
108 func (h timeHeap) Len() int { return len(h) }
109 func (h timeHeap) Less(i, j int) bool { return h[i].Before(h[j]) }
110 func (h timeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
111 func (h *timeHeap) Push(x interface{}) {
112 *h = append(*h, x.(time.Time))
113 }
114 func (h *timeHeap) Pop() interface{} {
115 old := *h
116 n := len(old)
117 x := old[n-1]
118 *h = old[0 : n-1]
119 return x
120 }
121
122 // gc checks if the number of instances in the state is greater than maximum.
123 // If yes, purges excessive oldest instances.
124 func (c *InstanceCache) gc(state *messages.InstanceCache) {
125 garbageSize := len(state.Entries) - instanceCacheMaxSize
126 if garbageSize <= 0 {
127 return
128 }
129
130 // Compute cutoff date by putting all access times to a heap
131 // and pop from it garbageSize times.
132 lastAccessTimes := make(timeHeap, 0, len(state.Entries))
133 for _, s := range state.Entries {
134 lastAccessTimes = append(lastAccessTimes, s.LastAccess.Time())
135 }
136 heap.Init(&lastAccessTimes)
137 for i := 0; i < garbageSize-1; i++ {
138 heap.Pop(&lastAccessTimes)
139 }
140 cutOff := heap.Pop(&lastAccessTimes).(time.Time)
141
142 // First garbageSize instances that were last accessed on or before cutO ff are garbage.
143 garbage := make([]string, 0, garbageSize)
144 // Map iteration is not deterministic, but it is fine.
145 for id, e := range state.Entries {
146 if !e.LastAccess.Time().After(cutOff) {
147 garbage = append(garbage, id)
148 if len(garbage) == cap(garbage) {
149 break
150 }
151 }
152 }
153
154 collected := 0
155 for _, id := range garbage {
156 path, err := c.fs.RootRelToAbs(id)
157 if err != nil {
158 panic("impossible")
159 }
160 if c.fs.EnsureFileGone(path) != nil {
161 // EnsureFileGone logs errors.
162 continue
163 }
164 delete(state.Entries, id)
165 collected++
166 }
167 c.logger.Infof("cipd: instance cache collected %d instances", collected)
168 }
169
170 // readState loads cache state from the state file.
171 // If the file does not exist, corrupted or its state was not synchronized
172 // with the instance files for a long time, synchronizes it.
173 // Newly discovered files are considered last accessed at zero time.
174 // If synchronization fails, then the state is considered empty.
175 func (c *InstanceCache) readState(state *messages.InstanceCache, now time.Time) {
176 statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename)
177 if err != nil {
178 panic("impossible")
179 }
180
181 stateBytes, err := ioutil.ReadFile(statePath)
182 sync := false
183 switch {
184 case os.IsNotExist(err):
185 sync = true
186
187 case err != nil:
188 c.logger.Warningf("cipd: could not read instance cache - %s", er r)
189 sync = true
190
191 default:
192 if err := UnmarshalWithSHA1(stateBytes, state); err != nil {
193 c.logger.Warningf("cipd: instance cache file is corrupte d - %s", err)
194 *state = messages.InstanceCache{}
195 sync = true
196 } else {
197 cutOff := now.
198 Add(-instanceCacheSyncInterval).
199 Add(time.Duration(rand.Int63n(int64(5 * time.Min ute))))
200 sync = state.LastSynced.Time().Before(cutOff)
201 }
202 }
203
204 if sync {
205 if err := c.syncState(state, now); err != nil {
206 c.logger.Warningf("cipd: failed to sync instance cache - %s", err)
207 }
208 c.gc(state)
209 }
210 }
211
212 // syncState synchronizes the list of instances in the state file with instance files.
213 // Preserves lastAccess of existing instances. Newly discovered files are
214 // considered last accessed at zero time.
215 func (c *InstanceCache) syncState(state *messages.InstanceCache, now time.Time) error {
216 root, err := os.Open(c.fs.Root())
217 switch {
218
219 case os.IsNotExist(err):
220 state.Entries = nil
221
222 case err != nil:
223 return err
224
225 default:
226 instanceIDs, err := root.Readdirnames(0)
227 if err != nil {
228 return err
229 }
230 existingIDs := stringset.New(len(instanceIDs))
231 for _, id := range instanceIDs {
232 if common.ValidateInstanceID(id) != nil {
233 continue
234 }
235 existingIDs.Add(id)
236
237 if _, ok := state.Entries[id]; !ok {
238 if state.Entries == nil {
239 state.Entries = map[string]*messages.Ins tanceCache_Entry{}
240 }
241 state.Entries[id] = &messages.InstanceCache_Entr y{}
242 }
243 }
244
245 for id := range state.Entries {
246 if !existingIDs.Has(id) {
247 delete(state.Entries, id)
248 }
249 }
250 }
251
252 state.LastSynced = google.NewTimestamp(now)
253 c.logger.Infof("cipd: synchronized instance cache with instance files")
254 return nil
255 }
256
257 // saveState persists the cache state.
258 func (c *InstanceCache) saveState(state *messages.InstanceCache) error {
259 stateBytes, err := MarshalWithSHA1(state)
260 if err != nil {
261 return err
262 }
263
264 statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename)
265 if err != nil {
266 panic("impossible")
267 }
268
269 return local.EnsureFile(c.fs, statePath, bytes.NewReader(stateBytes))
270 }
271
272 // withState loads cache state from the state file, calls f and saves it back.
273 // See also readState.
274 func (c *InstanceCache) withState(now time.Time, f func(*messages.InstanceCache) ) {
275 c.stateLock.Lock()
276 defer c.stateLock.Unlock()
277
278 state := &messages.InstanceCache{}
279
280 start := time.Now()
281 c.readState(state, now)
282 loadSaveTime := time.Since(start)
283
284 f(state)
285
286 start = time.Now()
287 if err := c.saveState(state); err != nil {
288 c.logger.Warningf("cipd: could not save instance cache - %s", er r)
289 }
290 loadSaveTime += time.Since(start)
291
292 if loadSaveTime > time.Second {
293 c.logger.Warningf("cipd: loading and saving instance cache with %d entries took %s", len(state.Entries), loadSaveTime)
294 }
295 }
296
297 // getAccessTime returns last access time of an instance.
298 // Used for testing.
299 func (c *InstanceCache) getAccessTime(now time.Time, pin common.Pin) (lastAccess time.Time, ok bool) {
300 c.withState(now, func(s *messages.InstanceCache) {
301 var entry *messages.InstanceCache_Entry
302 if entry, ok = s.Entries[pin.InstanceID]; ok {
303 lastAccess = entry.LastAccess.Time()
304 }
305 })
306 return
307 }
308
309 // touch updates/adds last access time for an instance.
310 func touch(state *messages.InstanceCache, instanceID string, now time.Time) {
311 entry := state.Entries[instanceID]
312 if entry == nil {
313 entry = &messages.InstanceCache_Entry{}
314 if state.Entries == nil {
315 state.Entries = map[string]*messages.InstanceCache_Entry {}
316 }
317 state.Entries[instanceID] = entry
318 }
319 entry.LastAccess = google.NewTimestamp(now)
320 }
OLDNEW
« no previous file with comments | « client/cipd/client.go ('k') | client/cipd/internal/instancecache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698