Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(408)

Unified Diff: client/cipd/internal/instancecache.go

Issue 1870263002: cipd: instance cache (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: restore check, improve comment Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/cipd/client.go ('k') | client/cipd/internal/instancecache_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/cipd/internal/instancecache.go
diff --git a/client/cipd/internal/instancecache.go b/client/cipd/internal/instancecache.go
new file mode 100644
index 0000000000000000000000000000000000000000..48e3bd3be85887593c377a7ba27a3f3fac87dbef
--- /dev/null
+++ b/client/cipd/internal/instancecache.go
@@ -0,0 +1,320 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package internal
+
+import (
+ "bytes"
+ "container/heap"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/stringset"
+
+ "github.com/luci/luci-go/client/cipd/common"
+ "github.com/luci/luci-go/client/cipd/internal/messages"
+ "github.com/luci/luci-go/client/cipd/local"
+)
+
+const (
+ instanceCacheMaxSize = 300
+ // instanceCacheSyncInterval determines the frequency of
+ // synchronization of state.db with instance files in the cache dir.
+ instanceCacheSyncInterval = 8 * time.Hour
+ instanceCacheStateFilename = "state.db"
+)
+
+// InstanceCache is a file-system-based, thread-safe, LRU cache of instances.
+//
+// Does not validate instance hashes; it is caller's responsibility.
+type InstanceCache struct {
+ fs local.FileSystem
+ stateLock sync.Mutex // synchronizes access to the state file.
+ logger logging.Logger
+}
+
+// NewInstanceCache initializes InstanceCache.
+// fs will be the root of the cache.
+func NewInstanceCache(fs local.FileSystem, logger logging.Logger) *InstanceCache {
+ if logger == nil {
+ logger = logging.Null()
+ }
+ return &InstanceCache{
+ fs: fs,
+ logger: logger,
+ }
+}
+
+// Get searches for the instance in the cache and writes its contents to output.
+// If the instance is not found, returns an os.IsNotExists error without writing
+// to output.
+func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) error {
+ if err := common.ValidatePin(pin); err != nil {
+ return err
+ }
+
+ path, err := c.fs.RootRelToAbs(pin.InstanceID)
+ if err != nil {
+ return fmt.Errorf("invalid instance ID %q", pin.InstanceID)
+ }
+
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ c.withState(now, func(s *messages.InstanceCache) {
+ touch(s, pin.InstanceID, now)
+ })
+
+ _, err = io.Copy(output, f)
+ return err
+}
+
+// Put caches an instance.
+// write must write the instance contents.
+// May remove some instances from the cache that were not accessed for a long time.
+func (c *InstanceCache) Put(pin common.Pin, now time.Time, write func(*os.File) error) error {
+ if err := common.ValidatePin(pin); err != nil {
+ return err
+ }
+ path, err := c.fs.RootRelToAbs(pin.InstanceID)
+ if err != nil {
+ return fmt.Errorf("invalid instance ID %q", pin.InstanceID)
+ }
+
+ if err := c.fs.EnsureFile(path, write); err != nil {
+ return err
+ }
+
+ c.withState(now, func(s *messages.InstanceCache) {
+ touch(s, pin.InstanceID, now)
+ c.gc(s)
+ })
+ return nil
+}
+
+type timeHeap []time.Time
+
+func (h timeHeap) Len() int { return len(h) }
+func (h timeHeap) Less(i, j int) bool { return h[i].Before(h[j]) }
+func (h timeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *timeHeap) Push(x interface{}) {
+ *h = append(*h, x.(time.Time))
+}
+func (h *timeHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+// gc checks if the number of instances in the state is greater than maximum.
+// If yes, purges excessive oldest instances.
+func (c *InstanceCache) gc(state *messages.InstanceCache) {
+ garbageSize := len(state.Entries) - instanceCacheMaxSize
+ if garbageSize <= 0 {
+ return
+ }
+
+ // Compute cutoff date by putting all access times to a heap
+ // and pop from it garbageSize times.
+ lastAccessTimes := make(timeHeap, 0, len(state.Entries))
+ for _, s := range state.Entries {
+ lastAccessTimes = append(lastAccessTimes, s.LastAccess.Time())
+ }
+ heap.Init(&lastAccessTimes)
+ for i := 0; i < garbageSize-1; i++ {
+ heap.Pop(&lastAccessTimes)
+ }
+ cutOff := heap.Pop(&lastAccessTimes).(time.Time)
+
+ // First garbageSize instances that were last accessed on or before cutOff are garbage.
+ garbage := make([]string, 0, garbageSize)
+ // Map iteration is not deterministic, but it is fine.
+ for id, e := range state.Entries {
+ if !e.LastAccess.Time().After(cutOff) {
+ garbage = append(garbage, id)
+ if len(garbage) == cap(garbage) {
+ break
+ }
+ }
+ }
+
+ collected := 0
+ for _, id := range garbage {
+ path, err := c.fs.RootRelToAbs(id)
+ if err != nil {
+ panic("impossible")
+ }
+ if c.fs.EnsureFileGone(path) != nil {
+ // EnsureFileGone logs errors.
+ continue
+ }
+ delete(state.Entries, id)
+ collected++
+ }
+ c.logger.Infof("cipd: instance cache collected %d instances", collected)
+}
+
+// readState loads cache state from the state file.
+// If the file does not exist, corrupted or its state was not synchronized
+// with the instance files for a long time, synchronizes it.
+// Newly discovered files are considered last accessed at zero time.
+// If synchronization fails, then the state is considered empty.
+func (c *InstanceCache) readState(state *messages.InstanceCache, now time.Time) {
+ statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename)
+ if err != nil {
+ panic("impossible")
+ }
+
+ stateBytes, err := ioutil.ReadFile(statePath)
+ sync := false
+ switch {
+ case os.IsNotExist(err):
+ sync = true
+
+ case err != nil:
+ c.logger.Warningf("cipd: could not read instance cache - %s", err)
+ sync = true
+
+ default:
+ if err := UnmarshalWithSHA1(stateBytes, state); err != nil {
+ c.logger.Warningf("cipd: instance cache file is corrupted - %s", err)
+ *state = messages.InstanceCache{}
+ sync = true
+ } else {
+ cutOff := now.
+ Add(-instanceCacheSyncInterval).
+ Add(time.Duration(rand.Int63n(int64(5 * time.Minute))))
+ sync = state.LastSynced.Time().Before(cutOff)
+ }
+ }
+
+ if sync {
+ if err := c.syncState(state, now); err != nil {
+ c.logger.Warningf("cipd: failed to sync instance cache - %s", err)
+ }
+ c.gc(state)
+ }
+}
+
+// syncState synchronizes the list of instances in the state file with instance files.
+// Preserves lastAccess of existing instances. Newly discovered files are
+// considered last accessed at zero time.
+func (c *InstanceCache) syncState(state *messages.InstanceCache, now time.Time) error {
+ root, err := os.Open(c.fs.Root())
+ switch {
+
+ case os.IsNotExist(err):
+ state.Entries = nil
+
+ case err != nil:
+ return err
+
+ default:
+ instanceIDs, err := root.Readdirnames(0)
+ if err != nil {
+ return err
+ }
+ existingIDs := stringset.New(len(instanceIDs))
+ for _, id := range instanceIDs {
+ if common.ValidateInstanceID(id) != nil {
+ continue
+ }
+ existingIDs.Add(id)
+
+ if _, ok := state.Entries[id]; !ok {
+ if state.Entries == nil {
+ state.Entries = map[string]*messages.InstanceCache_Entry{}
+ }
+ state.Entries[id] = &messages.InstanceCache_Entry{}
+ }
+ }
+
+ for id := range state.Entries {
+ if !existingIDs.Has(id) {
+ delete(state.Entries, id)
+ }
+ }
+ }
+
+ state.LastSynced = google.NewTimestamp(now)
+ c.logger.Infof("cipd: synchronized instance cache with instance files")
+ return nil
+}
+
+// saveState persists the cache state.
+func (c *InstanceCache) saveState(state *messages.InstanceCache) error {
+ stateBytes, err := MarshalWithSHA1(state)
+ if err != nil {
+ return err
+ }
+
+ statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename)
+ if err != nil {
+ panic("impossible")
+ }
+
+ return local.EnsureFile(c.fs, statePath, bytes.NewReader(stateBytes))
+}
+
+// withState loads cache state from the state file, calls f and saves it back.
+// See also readState.
+func (c *InstanceCache) withState(now time.Time, f func(*messages.InstanceCache)) {
+ c.stateLock.Lock()
+ defer c.stateLock.Unlock()
+
+ state := &messages.InstanceCache{}
+
+ start := time.Now()
+ c.readState(state, now)
+ loadSaveTime := time.Since(start)
+
+ f(state)
+
+ start = time.Now()
+ if err := c.saveState(state); err != nil {
+ c.logger.Warningf("cipd: could not save instance cache - %s", err)
+ }
+ loadSaveTime += time.Since(start)
+
+ if loadSaveTime > time.Second {
+ c.logger.Warningf("cipd: loading and saving instance cache with %d entries took %s", len(state.Entries), loadSaveTime)
+ }
+}
+
+// getAccessTime returns last access time of an instance.
+// Used for testing.
+func (c *InstanceCache) getAccessTime(now time.Time, pin common.Pin) (lastAccess time.Time, ok bool) {
+ c.withState(now, func(s *messages.InstanceCache) {
+ var entry *messages.InstanceCache_Entry
+ if entry, ok = s.Entries[pin.InstanceID]; ok {
+ lastAccess = entry.LastAccess.Time()
+ }
+ })
+ return
+}
+
+// touch updates/adds last access time for an instance.
+func touch(state *messages.InstanceCache, instanceID string, now time.Time) {
+ entry := state.Entries[instanceID]
+ if entry == nil {
+ entry = &messages.InstanceCache_Entry{}
+ if state.Entries == nil {
+ state.Entries = map[string]*messages.InstanceCache_Entry{}
+ }
+ state.Entries[instanceID] = entry
+ }
+ entry.LastAccess = google.NewTimestamp(now)
+}
« no previous file with comments | « client/cipd/client.go ('k') | client/cipd/internal/instancecache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698