Chromium Code Reviews| Index: client/cipd/internal/instancecache.go |
| diff --git a/client/cipd/internal/instancecache.go b/client/cipd/internal/instancecache.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..48e3bd3be85887593c377a7ba27a3f3fac87dbef |
| --- /dev/null |
| +++ b/client/cipd/internal/instancecache.go |
| @@ -0,0 +1,320 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package internal |
| + |
| +import ( |
| + "bytes" |
| + "container/heap" |
| + "fmt" |
| + "io" |
| + "io/ioutil" |
| + "math/rand" |
| + "os" |
| + "sync" |
| + "time" |
| + |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/google" |
| + "github.com/luci/luci-go/common/stringset" |
| + |
| + "github.com/luci/luci-go/client/cipd/common" |
| + "github.com/luci/luci-go/client/cipd/internal/messages" |
| + "github.com/luci/luci-go/client/cipd/local" |
| +) |
| + |
| +const ( |
| + instanceCacheMaxSize = 300 |
| + // instanceCacheSyncInterval determines the frequency of |
| + // synchronization of state.db with instance files in the cache dir. |
| + instanceCacheSyncInterval = 8 * time.Hour |
| + instanceCacheStateFilename = "state.db" |
| +) |
| + |
| +// InstanceCache is a file-system-based, thread-safe, LRU cache of instances. |
| +// |
| +// Does not validate instance hashes; it is caller's responsibility. |
| +type InstanceCache struct { |
| + fs local.FileSystem |
| + stateLock sync.Mutex // synchronizes access to the state file. |
| + logger logging.Logger |
| +} |
| + |
| +// NewInstanceCache initializes InstanceCache. |
| +// fs will be the root of the cache. |
| +func NewInstanceCache(fs local.FileSystem, logger logging.Logger) *InstanceCache { |
| + if logger == nil { |
| + logger = logging.Null() |
| + } |
| + return &InstanceCache{ |
| + fs: fs, |
| + logger: logger, |
| + } |
| +} |
| + |
| +// Get searches for the instance in the cache and writes its contents to output. |
| +// If the instance is not found, returns an os.IsNotExists error without writing |
| +// to output. |
| +func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) error { |
| + if err := common.ValidatePin(pin); err != nil { |
| + return err |
| + } |
| + |
| + path, err := c.fs.RootRelToAbs(pin.InstanceID) |
| + if err != nil { |
| + return fmt.Errorf("invalid instance ID %q", pin.InstanceID) |
| + } |
| + |
| + f, err := os.Open(path) |
| + if err != nil { |
| + return err |
| + } |
| + defer f.Close() |
| + |
| + c.withState(now, func(s *messages.InstanceCache) { |
| + touch(s, pin.InstanceID, now) |
| + }) |
| + |
| + _, err = io.Copy(output, f) |
| + return err |
| +} |
| + |
| +// Put caches an instance. |
| +// write must write the instance contents. |
| +// May remove some instances from the cache that were not accessed for a long time. |
| +func (c *InstanceCache) Put(pin common.Pin, now time.Time, write func(*os.File) error) error { |
|
Vadim Sh.
2016/04/13 21:49:02
btw, why *os.File and not io.WriteSeeker?
nodir
2016/04/13 23:23:52
because *os.File is what Put provides.
In general
|
| + if err := common.ValidatePin(pin); err != nil { |
| + return err |
| + } |
| + path, err := c.fs.RootRelToAbs(pin.InstanceID) |
| + if err != nil { |
| + return fmt.Errorf("invalid instance ID %q", pin.InstanceID) |
| + } |
| + |
| + if err := c.fs.EnsureFile(path, write); err != nil { |
| + return err |
| + } |
| + |
| + c.withState(now, func(s *messages.InstanceCache) { |
| + touch(s, pin.InstanceID, now) |
| + c.gc(s) |
| + }) |
| + return nil |
| +} |
| + |
| +type timeHeap []time.Time |
| + |
| +func (h timeHeap) Len() int { return len(h) } |
| +func (h timeHeap) Less(i, j int) bool { return h[i].Before(h[j]) } |
| +func (h timeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
| +func (h *timeHeap) Push(x interface{}) { |
| + *h = append(*h, x.(time.Time)) |
| +} |
| +func (h *timeHeap) Pop() interface{} { |
| + old := *h |
| + n := len(old) |
| + x := old[n-1] |
| + *h = old[0 : n-1] |
| + return x |
| +} |
| + |
| +// gc checks if the number of instances in the state is greater than maximum. |
| +// If yes, purges excessive oldest instances. |
| +func (c *InstanceCache) gc(state *messages.InstanceCache) { |
| + garbageSize := len(state.Entries) - instanceCacheMaxSize |
| + if garbageSize <= 0 { |
| + return |
| + } |
| + |
| + // Compute cutoff date by putting all access times to a heap |
| + // and pop from it garbageSize times. |
| + lastAccessTimes := make(timeHeap, 0, len(state.Entries)) |
| + for _, s := range state.Entries { |
| + lastAccessTimes = append(lastAccessTimes, s.LastAccess.Time()) |
| + } |
| + heap.Init(&lastAccessTimes) |
| + for i := 0; i < garbageSize-1; i++ { |
| + heap.Pop(&lastAccessTimes) |
| + } |
| + cutOff := heap.Pop(&lastAccessTimes).(time.Time) |
| + |
| + // First garbageSize instances that were last accessed on or before cutOff are garbage. |
| + garbage := make([]string, 0, garbageSize) |
| + // Map iteration is not deterministic, but it is fine. |
| + for id, e := range state.Entries { |
| + if !e.LastAccess.Time().After(cutOff) { |
| + garbage = append(garbage, id) |
| + if len(garbage) == cap(garbage) { |
| + break |
| + } |
| + } |
| + } |
| + |
| + collected := 0 |
| + for _, id := range garbage { |
| + path, err := c.fs.RootRelToAbs(id) |
| + if err != nil { |
| + panic("impossible") |
| + } |
| + if c.fs.EnsureFileGone(path) != nil { |
| + // EnsureFileGone logs errors. |
| + continue |
| + } |
| + delete(state.Entries, id) |
| + collected++ |
| + } |
| + c.logger.Infof("cipd: instance cache collected %d instances", collected) |
| +} |
| + |
| +// readState loads cache state from the state file. |
| +// If the file does not exist, corrupted or its state was not synchronized |
| +// with the instance files for a long time, synchronizes it. |
| +// Newly discovered files are considered last accessed at zero time. |
| +// If synchronization fails, then the state is considered empty. |
| +func (c *InstanceCache) readState(state *messages.InstanceCache, now time.Time) { |
| + statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename) |
| + if err != nil { |
| + panic("impossible") |
| + } |
| + |
| + stateBytes, err := ioutil.ReadFile(statePath) |
| + sync := false |
| + switch { |
| + case os.IsNotExist(err): |
| + sync = true |
| + |
| + case err != nil: |
| + c.logger.Warningf("cipd: could not read instance cache - %s", err) |
| + sync = true |
| + |
| + default: |
| + if err := UnmarshalWithSHA1(stateBytes, state); err != nil { |
| + c.logger.Warningf("cipd: instance cache file is corrupted - %s", err) |
| + *state = messages.InstanceCache{} |
| + sync = true |
| + } else { |
| + cutOff := now. |
| + Add(-instanceCacheSyncInterval). |
| + Add(time.Duration(rand.Int63n(int64(5 * time.Minute)))) |
| + sync = state.LastSynced.Time().Before(cutOff) |
| + } |
| + } |
| + |
| + if sync { |
| + if err := c.syncState(state, now); err != nil { |
| + c.logger.Warningf("cipd: failed to sync instance cache - %s", err) |
| + } |
| + c.gc(state) |
| + } |
| +} |
| + |
| +// syncState synchronizes the list of instances in the state file with instance files. |
| +// Preserves lastAccess of existing instances. Newly discovered files are |
| +// considered last accessed at zero time. |
| +func (c *InstanceCache) syncState(state *messages.InstanceCache, now time.Time) error { |
| + root, err := os.Open(c.fs.Root()) |
| + switch { |
| + |
| + case os.IsNotExist(err): |
| + state.Entries = nil |
| + |
| + case err != nil: |
| + return err |
| + |
| + default: |
| + instanceIDs, err := root.Readdirnames(0) |
| + if err != nil { |
| + return err |
| + } |
| + existingIDs := stringset.New(len(instanceIDs)) |
| + for _, id := range instanceIDs { |
| + if common.ValidateInstanceID(id) != nil { |
| + continue |
| + } |
| + existingIDs.Add(id) |
| + |
| + if _, ok := state.Entries[id]; !ok { |
| + if state.Entries == nil { |
| + state.Entries = map[string]*messages.InstanceCache_Entry{} |
| + } |
| + state.Entries[id] = &messages.InstanceCache_Entry{} |
| + } |
| + } |
| + |
| + for id := range state.Entries { |
| + if !existingIDs.Has(id) { |
| + delete(state.Entries, id) |
| + } |
| + } |
| + } |
| + |
| + state.LastSynced = google.NewTimestamp(now) |
| + c.logger.Infof("cipd: synchronized instance cache with instance files") |
| + return nil |
| +} |
| + |
| +// saveState persists the cache state. |
| +func (c *InstanceCache) saveState(state *messages.InstanceCache) error { |
| + stateBytes, err := MarshalWithSHA1(state) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename) |
| + if err != nil { |
| + panic("impossible") |
| + } |
| + |
| + return local.EnsureFile(c.fs, statePath, bytes.NewReader(stateBytes)) |
|
Vadim Sh.
2016/04/13 21:49:02
we will need to teach EnsureFile to retry access e
nodir
2016/04/13 23:23:52
yeah, that's what I thought when I saw your CL
|
| +} |
| + |
| +// withState loads cache state from the state file, calls f and saves it back. |
| +// See also readState. |
| +func (c *InstanceCache) withState(now time.Time, f func(*messages.InstanceCache)) { |
| + c.stateLock.Lock() |
| + defer c.stateLock.Unlock() |
| + |
| + state := &messages.InstanceCache{} |
| + |
| + start := time.Now() |
| + c.readState(state, now) |
| + loadSaveTime := time.Since(start) |
| + |
| + f(state) |
| + |
| + start = time.Now() |
| + if err := c.saveState(state); err != nil { |
| + c.logger.Warningf("cipd: could not save instance cache - %s", err) |
| + } |
| + loadSaveTime += time.Since(start) |
| + |
| + if loadSaveTime > time.Second { |
| + c.logger.Warningf("cipd: loading and saving instance cache with %d entries took %s", len(state.Entries), loadSaveTime) |
| + } |
| +} |
| + |
| +// getAccessTime returns last access time of an instance. |
| +// Used for testing. |
| +func (c *InstanceCache) getAccessTime(now time.Time, pin common.Pin) (lastAccess time.Time, ok bool) { |
| + c.withState(now, func(s *messages.InstanceCache) { |
| + var entry *messages.InstanceCache_Entry |
| + if entry, ok = s.Entries[pin.InstanceID]; ok { |
| + lastAccess = entry.LastAccess.Time() |
| + } |
| + }) |
| + return |
| +} |
| + |
| +// touch updates/adds last access time for an instance. |
| +func touch(state *messages.InstanceCache, instanceID string, now time.Time) { |
| + entry := state.Entries[instanceID] |
| + if entry == nil { |
| + entry = &messages.InstanceCache_Entry{} |
| + if state.Entries == nil { |
| + state.Entries = map[string]*messages.InstanceCache_Entry{} |
| + } |
| + state.Entries[instanceID] = entry |
| + } |
| + entry.LastAccess = google.NewTimestamp(now) |
| +} |