| Index: client/cipd/internal/instancecache.go
|
| diff --git a/client/cipd/internal/instancecache.go b/client/cipd/internal/instancecache.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..65584ba72c2f626bbc9396676897eef8d6f1e511
|
| --- /dev/null
|
| +++ b/client/cipd/internal/instancecache.go
|
| @@ -0,0 +1,284 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package internal
|
| +
|
| +import (
|
| + "bytes"
|
| + "fmt"
|
| + "io"
|
| + "io/ioutil"
|
| + "math/rand"
|
| + "os"
|
| + "sync"
|
| + "time"
|
| +
|
| + "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| + "github.com/luci/luci-go/common/stringset"
|
| +
|
| + "github.com/luci/luci-go/client/cipd/common"
|
| + "github.com/luci/luci-go/client/cipd/internal/messages"
|
| + "github.com/luci/luci-go/client/cipd/local"
|
| +)
|
| +
|
| +const (
|
| + instanceCacheMaxSize = 100
|
| + // instanceCacheSyncInterval determines the frequency of
|
| + // synchronization of state.db with files in the cache dir.
|
| + instanceCacheSyncInterval = 8 * time.Hour
|
| + instanceCacheStateFilename = "state.db"
|
| +)
|
| +
|
| +// InstanceCache is a file-system-based, thread-safe, LRU cache of instances.
|
| +//
|
| +// Does not validate instance hashes. It is caller responsibility to do so.
|
| +type InstanceCache struct {
|
| + fs local.FileSystem
|
| + lock sync.Mutex
|
| + logger logging.Logger
|
| + state messages.InstanceCache
|
| + dirty bool
|
| +}
|
| +
|
| +// LoadInstanceCache initializes InstanceCache.
|
| +// fs will be root of the instance cache.
|
| +func LoadInstanceCache(fs local.FileSystem, logger logging.Logger, now time.Time) *InstanceCache {
|
| + if logger == nil {
|
| + logger = logging.Null()
|
| + }
|
| + cache := &InstanceCache{
|
| + fs: fs,
|
| + logger: logger,
|
| + }
|
| + cache.readState(now)
|
| + return cache
|
| +}
|
| +
|
| +// Get searches for the instance in cache and writes its contents to output.
|
| +// If the instance is not found, returns an os.IsNotExists error without writing
|
| +// to output.
|
| +func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) error {
|
| + c.lock.Lock()
|
| + defer c.lock.Unlock()
|
| +
|
| + path, err := c.fs.RootRelToAbs(pin.InstanceID)
|
| + if err != nil {
|
| + return fmt.Errorf("invalid instance ID %q", pin.InstanceID)
|
| + }
|
| +
|
| + f, err := os.Open(path)
|
| + if err != nil {
|
| + if os.IsNotExist(err) {
|
| + delete(c.state.Entries, pin.InstanceID)
|
| + }
|
| + return err
|
| + }
|
| + defer f.Close()
|
| +
|
| + c.touch(pin.InstanceID, now)
|
| +
|
| + _, err = io.Copy(output, f)
|
| + return err
|
| +}
|
| +
|
| +// Put caches instance contents.
|
| +// May remove some packages that were not accessed for a long time.
|
| +func (c *InstanceCache) Put(pin common.Pin, now time.Time, write func(*os.File) error) error {
|
| + c.lock.Lock()
|
| + defer c.lock.Unlock()
|
| +
|
| + c.touch(pin.InstanceID, now)
|
| +
|
| + if err := c.fs.EnsureFile(pin.InstanceID, write); err != nil {
|
| + return err
|
| + }
|
| +
|
| + c.gc()
|
| + return nil
|
| +}
|
| +
|
| +// Save persists cache state to the state file.
|
| +func (c *InstanceCache) Save() error {
|
| + return c.saveState()
|
| +}
|
| +
|
| +// Dirty returns true if cache state differs from the state file.
|
| +func (c *InstanceCache) Dirty() bool {
|
| + return c.dirty
|
| +}
|
| +
|
| +// gc checks if the number of instances in the state is greater than maximum.
|
| +// If yes, purges excessive oldest instances.
|
| +func (c *InstanceCache) gc() {
|
| + n := len(c.state.Entries)
|
| + m := instanceCacheMaxSize
|
| + if n <= m {
|
| + return
|
| + }
|
| +
|
| + // Among all instances, leave only top m recently-accessed.
|
| + // Find the m-th recently-accessed instance.
|
| + found := 0
|
| + var cutOff time.Time // lastAccess of found-th most-recently-accessed instance
|
| + for _, s := range c.state.Entries {
|
| + lastAccess := s.LastAccess.Time()
|
| + switch {
|
| + case found == 0:
|
| + cutOff = lastAccess
|
| + found++
|
| +
|
| + case found < m:
|
| + if lastAccess.Before(cutOff) {
|
| + cutOff = lastAccess
|
| + }
|
| + found++
|
| +
|
| + case found == m:
|
| + if lastAccess.After(cutOff) {
|
| + cutOff = lastAccess
|
| + }
|
| + }
|
| + }
|
| +
|
| + // First m-n instances that were last accessed before cutOff are garbage.
|
| + garbage := make([]string, 0, m-n)
|
| + // Map iteration is not deterministic, but it is fine.
|
| + for id, e := range c.state.Entries {
|
| + if e.LastAccess.Time().Before(cutOff) {
|
| + garbage = append(garbage, id)
|
| + if len(garbage) == cap(garbage) {
|
| + break
|
| + }
|
| + }
|
| + }
|
| +
|
| + for _, id := range garbage {
|
| + path, err := c.fs.RootRelToAbs(id)
|
| + if err != nil {
|
| + panic("impossible")
|
| + }
|
| + if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
|
| + c.logger.Errorf("cipd: could not delete %s from cache - %s", path, err)
|
| + }
|
| + delete(c.state.Entries, id)
|
| + }
|
| +}
|
| +
|
| +// readState loads c.state from the state file.
|
| +// If the file does not exist, corrupted or its state was not synchronized
|
| +// with the instance files for a long time, synchronizes it.
|
| +// Newly discovered files are considered last accessed at EPOCH.
|
| +func (c *InstanceCache) readState(now time.Time) error {
|
| + statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename)
|
| + if err != nil {
|
| + panic("impossible")
|
| + }
|
| +
|
| + stateBytes, err := ioutil.ReadFile(statePath)
|
| + sync := false
|
| + switch {
|
| + case os.IsNotExist(err):
|
| + sync = true
|
| +
|
| + case err != nil:
|
| + c.logger.Errorf("cipd: could not read instance cache - %s", err)
|
| + sync = true
|
| +
|
| + default:
|
| + if err := UnmarshalWithSHA1(stateBytes, &c.state); err != nil {
|
| + c.logger.Errorf("cipd: instance cache file is corrupted - %s", err)
|
| + c.state = messages.InstanceCache{}
|
| + sync = true
|
| + } else {
|
| + cutOff := now.
|
| + Add(-instanceCacheSyncInterval).
|
| + Add(time.Duration(rand.Int63n(int64(5 * time.Minute))))
|
| + sync = c.state.LastSynced.Time().Before(cutOff)
|
| + }
|
| + }
|
| +
|
| + if sync {
|
| + if err := c.syncState(now); err != nil {
|
| + c.logger.Errorf("cipd: failed to sync instance cache - %s", err)
|
| + }
|
| + c.gc()
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +// syncState synchronizes the list of instances with instance files.
|
| +// Preserves lastAccess of existing instances. Newly discovered files are
|
| +// considered last accessed at EPOCH.
|
| +func (c *InstanceCache) syncState(now time.Time) error {
|
| + root, err := os.Open(c.fs.Root())
|
| + switch {
|
| +
|
| + case os.IsNotExist(err):
|
| + c.state.Entries = nil
|
| +
|
| + case err != nil:
|
| + return err
|
| +
|
| + default:
|
| + instanceIDs, err := root.Readdirnames(0)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + existingIDs := stringset.New(len(instanceIDs))
|
| + for _, id := range instanceIDs {
|
| + if common.ValidateInstanceID(id) != nil {
|
| + continue
|
| + }
|
| + existingIDs.Add(id)
|
| +
|
| + if _, ok := c.state.Entries[id]; !ok {
|
| + c.ensureEntries()
|
| + c.state.Entries[id] = &messages.InstanceCache_Entry{}
|
| + }
|
| + }
|
| +
|
| + for id := range c.state.Entries {
|
| + if !existingIDs.Has(id) {
|
| + delete(c.state.Entries, id)
|
| + }
|
| + }
|
| + }
|
| +
|
| + c.state.LastSynced = google.NewTimestamp(now)
|
| + c.dirty = true
|
| + return nil
|
| +}
|
| +
|
| +// saveState persists the cache state.
|
| +func (c *InstanceCache) saveState() error {
|
| + stateBytes, err := MarshalWithSHA1(&c.state)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + if err := local.EnsureFile(c.fs, instanceCacheStateFilename, bytes.NewReader(stateBytes)); err != nil {
|
| + return err
|
| + }
|
| +
|
| + c.dirty = false
|
| + return nil
|
| +}
|
| +
|
| +func (c *InstanceCache) ensureEntries() {
|
| + if c.state.Entries == nil {
|
| + c.state.Entries = map[string]*messages.InstanceCache_Entry{}
|
| + }
|
| +}
|
| +
|
| +// touch updates/adds last access time for an instance.
|
| +func (c *InstanceCache) touch(instanceID string, now time.Time) {
|
| + entry := c.state.Entries[instanceID]
|
| + if entry == nil {
|
| + entry = &messages.InstanceCache_Entry{}
|
| + c.ensureEntries()
|
| + c.state.Entries[instanceID] = entry
|
| + }
|
| + entry.LastAccess = google.NewTimestamp(now)
|
| +}
|
|
|