| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package internal |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "container/heap" |
| 10 "fmt" |
| 11 "io" |
| 12 "io/ioutil" |
| 13 "math/rand" |
| 14 "os" |
| 15 "sync" |
| 16 "time" |
| 17 |
| 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/proto/google" |
| 20 "github.com/luci/luci-go/common/stringset" |
| 21 |
| 22 "github.com/luci/luci-go/client/cipd/common" |
| 23 "github.com/luci/luci-go/client/cipd/internal/messages" |
| 24 "github.com/luci/luci-go/client/cipd/local" |
| 25 ) |
| 26 |
| 27 const ( |
| 28 instanceCacheMaxSize = 300 |
| 29 // instanceCacheSyncInterval determines the frequency of |
| 30 // synchronization of state.db with instance files in the cache dir. |
| 31 instanceCacheSyncInterval = 8 * time.Hour |
| 32 instanceCacheStateFilename = "state.db" |
| 33 ) |
| 34 |
| 35 // InstanceCache is a file-system-based, thread-safe, LRU cache of instances. |
| 36 // |
| 37 // Does not validate instance hashes; it is caller's responsibility. |
| 38 type InstanceCache struct { |
| 39 fs local.FileSystem |
| 40 stateLock sync.Mutex // synchronizes access to the state file. |
| 41 logger logging.Logger |
| 42 } |
| 43 |
| 44 // NewInstanceCache initializes InstanceCache. |
| 45 // fs will be the root of the cache. |
| 46 func NewInstanceCache(fs local.FileSystem, logger logging.Logger) *InstanceCache
{ |
| 47 if logger == nil { |
| 48 logger = logging.Null() |
| 49 } |
| 50 return &InstanceCache{ |
| 51 fs: fs, |
| 52 logger: logger, |
| 53 } |
| 54 } |
| 55 |
| 56 // Get searches for the instance in the cache and writes its contents to output. |
| 57 // If the instance is not found, returns an os.IsNotExists error without writing |
| 58 // to output. |
| 59 func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) err
or { |
| 60 if err := common.ValidatePin(pin); err != nil { |
| 61 return err |
| 62 } |
| 63 |
| 64 path, err := c.fs.RootRelToAbs(pin.InstanceID) |
| 65 if err != nil { |
| 66 return fmt.Errorf("invalid instance ID %q", pin.InstanceID) |
| 67 } |
| 68 |
| 69 f, err := os.Open(path) |
| 70 if err != nil { |
| 71 return err |
| 72 } |
| 73 defer f.Close() |
| 74 |
| 75 c.withState(now, func(s *messages.InstanceCache) { |
| 76 touch(s, pin.InstanceID, now) |
| 77 }) |
| 78 |
| 79 _, err = io.Copy(output, f) |
| 80 return err |
| 81 } |
| 82 |
| 83 // Put caches an instance. |
| 84 // write must write the instance contents. |
| 85 // May remove some instances from the cache that were not accessed for a long ti
me. |
| 86 func (c *InstanceCache) Put(pin common.Pin, now time.Time, write func(*os.File)
error) error { |
| 87 if err := common.ValidatePin(pin); err != nil { |
| 88 return err |
| 89 } |
| 90 path, err := c.fs.RootRelToAbs(pin.InstanceID) |
| 91 if err != nil { |
| 92 return fmt.Errorf("invalid instance ID %q", pin.InstanceID) |
| 93 } |
| 94 |
| 95 if err := c.fs.EnsureFile(path, write); err != nil { |
| 96 return err |
| 97 } |
| 98 |
| 99 c.withState(now, func(s *messages.InstanceCache) { |
| 100 touch(s, pin.InstanceID, now) |
| 101 c.gc(s) |
| 102 }) |
| 103 return nil |
| 104 } |
| 105 |
| 106 type timeHeap []time.Time |
| 107 |
| 108 func (h timeHeap) Len() int { return len(h) } |
| 109 func (h timeHeap) Less(i, j int) bool { return h[i].Before(h[j]) } |
| 110 func (h timeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
| 111 func (h *timeHeap) Push(x interface{}) { |
| 112 *h = append(*h, x.(time.Time)) |
| 113 } |
| 114 func (h *timeHeap) Pop() interface{} { |
| 115 old := *h |
| 116 n := len(old) |
| 117 x := old[n-1] |
| 118 *h = old[0 : n-1] |
| 119 return x |
| 120 } |
| 121 |
| 122 // gc checks if the number of instances in the state is greater than maximum. |
| 123 // If yes, purges excessive oldest instances. |
| 124 func (c *InstanceCache) gc(state *messages.InstanceCache) { |
| 125 garbageSize := len(state.Entries) - instanceCacheMaxSize |
| 126 if garbageSize <= 0 { |
| 127 return |
| 128 } |
| 129 |
| 130 // Compute cutoff date by putting all access times to a heap |
| 131 // and pop from it garbageSize times. |
| 132 lastAccessTimes := make(timeHeap, 0, len(state.Entries)) |
| 133 for _, s := range state.Entries { |
| 134 lastAccessTimes = append(lastAccessTimes, s.LastAccess.Time()) |
| 135 } |
| 136 heap.Init(&lastAccessTimes) |
| 137 for i := 0; i < garbageSize-1; i++ { |
| 138 heap.Pop(&lastAccessTimes) |
| 139 } |
| 140 cutOff := heap.Pop(&lastAccessTimes).(time.Time) |
| 141 |
| 142 // First garbageSize instances that were last accessed on or before cutO
ff are garbage. |
| 143 garbage := make([]string, 0, garbageSize) |
| 144 // Map iteration is not deterministic, but it is fine. |
| 145 for id, e := range state.Entries { |
| 146 if !e.LastAccess.Time().After(cutOff) { |
| 147 garbage = append(garbage, id) |
| 148 if len(garbage) == cap(garbage) { |
| 149 break |
| 150 } |
| 151 } |
| 152 } |
| 153 |
| 154 collected := 0 |
| 155 for _, id := range garbage { |
| 156 path, err := c.fs.RootRelToAbs(id) |
| 157 if err != nil { |
| 158 panic("impossible") |
| 159 } |
| 160 if c.fs.EnsureFileGone(path) != nil { |
| 161 // EnsureFileGone logs errors. |
| 162 continue |
| 163 } |
| 164 delete(state.Entries, id) |
| 165 collected++ |
| 166 } |
| 167 c.logger.Infof("cipd: instance cache collected %d instances", collected) |
| 168 } |
| 169 |
| 170 // readState loads cache state from the state file. |
| 171 // If the file does not exist, corrupted or its state was not synchronized |
| 172 // with the instance files for a long time, synchronizes it. |
| 173 // Newly discovered files are considered last accessed at zero time. |
| 174 // If synchronization fails, then the state is considered empty. |
| 175 func (c *InstanceCache) readState(state *messages.InstanceCache, now time.Time)
{ |
| 176 statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename) |
| 177 if err != nil { |
| 178 panic("impossible") |
| 179 } |
| 180 |
| 181 stateBytes, err := ioutil.ReadFile(statePath) |
| 182 sync := false |
| 183 switch { |
| 184 case os.IsNotExist(err): |
| 185 sync = true |
| 186 |
| 187 case err != nil: |
| 188 c.logger.Warningf("cipd: could not read instance cache - %s", er
r) |
| 189 sync = true |
| 190 |
| 191 default: |
| 192 if err := UnmarshalWithSHA1(stateBytes, state); err != nil { |
| 193 c.logger.Warningf("cipd: instance cache file is corrupte
d - %s", err) |
| 194 *state = messages.InstanceCache{} |
| 195 sync = true |
| 196 } else { |
| 197 cutOff := now. |
| 198 Add(-instanceCacheSyncInterval). |
| 199 Add(time.Duration(rand.Int63n(int64(5 * time.Min
ute)))) |
| 200 sync = state.LastSynced.Time().Before(cutOff) |
| 201 } |
| 202 } |
| 203 |
| 204 if sync { |
| 205 if err := c.syncState(state, now); err != nil { |
| 206 c.logger.Warningf("cipd: failed to sync instance cache -
%s", err) |
| 207 } |
| 208 c.gc(state) |
| 209 } |
| 210 } |
| 211 |
| 212 // syncState synchronizes the list of instances in the state file with instance
files. |
| 213 // Preserves lastAccess of existing instances. Newly discovered files are |
| 214 // considered last accessed at zero time. |
| 215 func (c *InstanceCache) syncState(state *messages.InstanceCache, now time.Time)
error { |
| 216 root, err := os.Open(c.fs.Root()) |
| 217 switch { |
| 218 |
| 219 case os.IsNotExist(err): |
| 220 state.Entries = nil |
| 221 |
| 222 case err != nil: |
| 223 return err |
| 224 |
| 225 default: |
| 226 instanceIDs, err := root.Readdirnames(0) |
| 227 if err != nil { |
| 228 return err |
| 229 } |
| 230 existingIDs := stringset.New(len(instanceIDs)) |
| 231 for _, id := range instanceIDs { |
| 232 if common.ValidateInstanceID(id) != nil { |
| 233 continue |
| 234 } |
| 235 existingIDs.Add(id) |
| 236 |
| 237 if _, ok := state.Entries[id]; !ok { |
| 238 if state.Entries == nil { |
| 239 state.Entries = map[string]*messages.Ins
tanceCache_Entry{} |
| 240 } |
| 241 state.Entries[id] = &messages.InstanceCache_Entr
y{} |
| 242 } |
| 243 } |
| 244 |
| 245 for id := range state.Entries { |
| 246 if !existingIDs.Has(id) { |
| 247 delete(state.Entries, id) |
| 248 } |
| 249 } |
| 250 } |
| 251 |
| 252 state.LastSynced = google.NewTimestamp(now) |
| 253 c.logger.Infof("cipd: synchronized instance cache with instance files") |
| 254 return nil |
| 255 } |
| 256 |
| 257 // saveState persists the cache state. |
| 258 func (c *InstanceCache) saveState(state *messages.InstanceCache) error { |
| 259 stateBytes, err := MarshalWithSHA1(state) |
| 260 if err != nil { |
| 261 return err |
| 262 } |
| 263 |
| 264 statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename) |
| 265 if err != nil { |
| 266 panic("impossible") |
| 267 } |
| 268 |
| 269 return local.EnsureFile(c.fs, statePath, bytes.NewReader(stateBytes)) |
| 270 } |
| 271 |
| 272 // withState loads cache state from the state file, calls f and saves it back. |
| 273 // See also readState. |
| 274 func (c *InstanceCache) withState(now time.Time, f func(*messages.InstanceCache)
) { |
| 275 c.stateLock.Lock() |
| 276 defer c.stateLock.Unlock() |
| 277 |
| 278 state := &messages.InstanceCache{} |
| 279 |
| 280 start := time.Now() |
| 281 c.readState(state, now) |
| 282 loadSaveTime := time.Since(start) |
| 283 |
| 284 f(state) |
| 285 |
| 286 start = time.Now() |
| 287 if err := c.saveState(state); err != nil { |
| 288 c.logger.Warningf("cipd: could not save instance cache - %s", er
r) |
| 289 } |
| 290 loadSaveTime += time.Since(start) |
| 291 |
| 292 if loadSaveTime > time.Second { |
| 293 c.logger.Warningf("cipd: loading and saving instance cache with
%d entries took %s", len(state.Entries), loadSaveTime) |
| 294 } |
| 295 } |
| 296 |
| 297 // getAccessTime returns last access time of an instance. |
| 298 // Used for testing. |
| 299 func (c *InstanceCache) getAccessTime(now time.Time, pin common.Pin) (lastAccess
time.Time, ok bool) { |
| 300 c.withState(now, func(s *messages.InstanceCache) { |
| 301 var entry *messages.InstanceCache_Entry |
| 302 if entry, ok = s.Entries[pin.InstanceID]; ok { |
| 303 lastAccess = entry.LastAccess.Time() |
| 304 } |
| 305 }) |
| 306 return |
| 307 } |
| 308 |
| 309 // touch updates/adds last access time for an instance. |
| 310 func touch(state *messages.InstanceCache, instanceID string, now time.Time) { |
| 311 entry := state.Entries[instanceID] |
| 312 if entry == nil { |
| 313 entry = &messages.InstanceCache_Entry{} |
| 314 if state.Entries == nil { |
| 315 state.Entries = map[string]*messages.InstanceCache_Entry
{} |
| 316 } |
| 317 state.Entries[instanceID] = entry |
| 318 } |
| 319 entry.LastAccess = google.NewTimestamp(now) |
| 320 } |
| OLD | NEW |