Chromium Code Reviews| Index: client/cipd/internal/instancecache.go |
| diff --git a/client/cipd/internal/instancecache.go b/client/cipd/internal/instancecache.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..125bda17e7d44ff55c508ff323c7c0ae53f28e34 |
| --- /dev/null |
| +++ b/client/cipd/internal/instancecache.go |
| @@ -0,0 +1,311 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package internal |
| + |
| +import ( |
| + "fmt" |
| + "io" |
| + "io/ioutil" |
| + "math/rand" |
| + "os" |
| + "sync" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/google" |
| + "github.com/luci/luci-go/common/stringset" |
| + |
| + "github.com/luci/luci-go/client/cipd/common" |
| + "github.com/luci/luci-go/client/cipd/internal/messages" |
| + "github.com/luci/luci-go/client/cipd/local" |
| +) |
| + |
| +const ( |
| + instanceCacheMaxSize = 100 |
| + // instanceCacheSyncInterval determines the frequency of |
| + // synchronization of state.db with files in the cache dir. |
| + instanceCacheSyncInterval = 8 * time.Hour |
| +) |
| + |
| +// InstanceCache is a file-system-based, thread-safe, LRU cache of instances. |
| +type InstanceCache struct { |
| + path local.FileSystem |
|
Vadim Sh.
2016/04/11 16:52:03
fs
it is not path
nodir
2016/04/11 22:16:16
Done.
|
| + lock sync.Mutex |
| + logger logging.Logger |
| + state messages.InstanceCache |
| + dirty bool |
| +} |
| + |
| +// LoadInstanceCache initializes InstanceCache. |
| +func LoadInstanceCache(path local.FileSystem, logger logging.Logger, now time.Time) *InstanceCache { |
| + if logger == nil { |
| + logger = logging.Null() |
| + } |
| + cache := &InstanceCache{ |
| + path: path, |
| + logger: logger, |
| + } |
| + cache.readState(now) |
| + return cache |
| +} |
| + |
| +// Get searches for the instance in cache and writes its contents to output. |
| +// If the instance is not found, returns an os.IsNotExists error without writing |
| +// to output. |
| +func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) error { |
| + c.lock.Lock() |
| + defer c.lock.Unlock() |
| + |
| + path, err := c.path.RootRelToAbs(pin.InstanceID) |
| + if err != nil { |
| + return fmt.Errorf("invalid instance ID %q", pin.InstanceID) |
| + } |
| + |
| + f, err := os.Open(path) |
| + if err != nil { |
| + if os.IsNotExist(err) { |
| + delete(c.state.Entries, pin.InstanceID) |
| + } |
| + return err |
| + } |
| + defer f.Close() |
| + |
| + c.touch(pin.InstanceID, now) |
| + |
| + _, err = io.Copy(output, f) |
| + return err |
| +} |
| + |
| +// Put caches instance contents. |
| +// May remove some packages that were not accessed for a long time. |
| +func (c *InstanceCache) Put(pin common.Pin, contents io.Reader, now time.Time) error { |
| + c.lock.Lock() |
| + defer c.lock.Unlock() |
| + |
| + path, err := c.path.RootRelToAbs(pin.InstanceID) |
| + if err != nil { |
| + return fmt.Errorf("invalid instance ID %q", pin.InstanceID) |
| + } |
| + |
| + c.touch(pin.InstanceID, now) |
| + |
| + if _, err := c.path.EnsureDirectory(c.path.Root()); err != nil { |
| + return err |
| + } |
| + |
| + f, err := os.Create(path) |
| + if err != nil { |
| + return err |
| + } |
| + closed := false |
| + defer func() { |
| + if !closed { |
| + f.Close() |
| + } |
| + }() |
| + if _, err = io.Copy(f, contents); err != nil { |
|
Vadim Sh.
2016/04/11 16:52:03
use FileSystem.EnsureFile to avoid:
1) two cipd p
nodir
2016/04/11 22:16:16
Done.
|
| + return err |
| + } |
| + closed = f.Close() == nil |
| + |
| + c.gc() |
| + return nil |
| +} |
| + |
| +// Save persists cache state to the state file. |
| +func (c *InstanceCache) Save() error { |
| + return c.saveState() |
| +} |
| + |
| +// Dirty returns true if cache state differs from the state file. |
| +func (c *InstanceCache) Dirty() bool { |
| + return c.dirty |
| +} |
| + |
| +// gc checks if the number of instances in the state is greater than maximum. |
| +// If yes, purges excessive oldest instances. |
| +func (c *InstanceCache) gc() { |
| + n := len(c.state.Entries) |
| + m := instanceCacheMaxSize |
| + if n <= m { |
| + return |
| + } |
| + |
| + // Among all instances, leave only top m recently-accessed. |
| + // Find the m-th recently-accessed instance. |
| + found := 0 |
| + var cutOff time.Time // lastAccess of found-th most-recently-accessed instance |
| + for _, s := range c.state.Entries { |
| + lastAccess := s.LastAccess.Time() |
| + switch { |
| + case found == 0: |
| + cutOff = lastAccess |
| + found++ |
| + |
| + case found < m: |
| + if lastAccess.Before(cutOff) { |
| + cutOff = lastAccess |
| + } |
| + found++ |
| + |
| + case found == m: |
| + if lastAccess.After(cutOff) { |
| + cutOff = lastAccess |
| + } |
| + } |
| + } |
| + |
| + // First m-n instances that were last accessed before cutOff are garbage. |
| + garbage := make([]string, 0, m-n) |
| + // Map iteration is not deterministic, but it is fine. |
| + for id, e := range c.state.Entries { |
| + if e.LastAccess.Time().Before(cutOff) { |
| + garbage = append(garbage, id) |
| + if len(garbage) == cap(garbage) { |
| + break |
| + } |
| + } |
| + } |
| + |
| + for _, id := range garbage { |
| + path, err := c.path.RootRelToAbs(id) |
| + if err != nil { |
| + panic("impossible") |
| + } |
| + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { |
| + c.logger.Errorf("cipd: could not delete %s from cache - %s", path, err) |
| + } |
| + delete(c.state.Entries, id) |
| + } |
| +} |
| + |
| +// statePath returns native absolute path of the state file. |
| +func (c *InstanceCache) statePath() string { |
| + path, err := c.path.RootRelToAbs("state.db") |
| + if err != nil { |
| + panic("impossible") |
| + } |
| + return path |
| +} |
| + |
| +// readState loads c.state from the state file. |
| +// If the file does not exist, corrupted or its state was not synchronized |
| +// with the instance files for a long time, synchronizes it. |
| +// Newly discovered files are considered last accessed at EPOCH. |
| +func (c *InstanceCache) readState(now time.Time) error { |
| + stateBytes, err := ioutil.ReadFile(c.statePath()) |
| + sync := false |
| + switch { |
| + case os.IsNotExist(err): |
| + sync = true |
| + |
| + case err != nil: |
| + c.logger.Errorf("cipd: could not read instance cache - %s", err) |
| + sync = true |
| + |
| + default: |
| + if err := proto.Unmarshal(stateBytes, &c.state); err != nil { |
| + c.logger.Errorf("cipd: instance cache file is corrupted - %s", err) |
| + c.state = messages.InstanceCache{} |
| + sync = true |
| + } else { |
| + cutOff := now. |
| + Add(-instanceCacheSyncInterval). |
| + Add(time.Duration(rand.Int63n(int64(5 * time.Minute)))) |
| + sync = c.state.LastSynced.Time().Before(cutOff) |
| + } |
| + } |
| + |
| + if sync { |
| + if err := c.syncState(now); err != nil { |
| + c.logger.Errorf("cipd: failed to sync instance cache - %s", err) |
| + } |
| + c.gc() |
| + } |
| + return nil |
| +} |
| + |
| +// syncState synchronizes the list of instances with instance files. |
| +// Preserves lastAccess of existing instances. Newly discovered files are |
| +// considered last accessed at EPOCH. |
| +func (c *InstanceCache) syncState(now time.Time) error { |
| + root, err := os.Open(c.path.Root()) |
| + switch { |
| + |
| + case os.IsNotExist(err): |
| + c.state.Entries = nil |
| + |
| + case err != nil: |
| + return err |
| + |
| + default: |
| + instanceIDs, err := root.Readdirnames(0) |
| + if err != nil { |
| + return err |
| + } |
| + existingIDs := stringset.New(len(instanceIDs)) |
| + for _, id := range instanceIDs { |
| + if common.ValidateInstanceID(id) != nil { |
| + continue |
| + } |
| + existingIDs.Add(id) |
| + |
| + if _, ok := c.state.Entries[id]; !ok { |
| + c.ensureEntries() |
| + c.state.Entries[id] = &messages.InstanceCache_Entry{} |
| + } |
| + } |
| + |
| + for id := range c.state.Entries { |
| + if !existingIDs.Has(id) { |
| + delete(c.state.Entries, id) |
| + } |
| + } |
| + } |
| + |
| + c.state.LastSynced = google.NewTimestamp(now) |
| + c.dirty = true |
| + return nil |
| +} |
| + |
| +// saveState persists the cache state. |
| +func (c *InstanceCache) saveState() error { |
| + stateBytes, err := proto.Marshal(&c.state) |
|
Vadim Sh.
2016/04/11 16:52:03
please use BlobWithSHA1 too: https://github.com/lu
nodir
2016/04/11 22:16:16
Done.
|
| + if err != nil { |
| + return err |
| + } |
| + |
| + if _, err := c.path.EnsureDirectory(c.path.Root()); err != nil { |
| + return err |
| + } |
| + if err := ioutil.WriteFile(c.statePath(), stateBytes, 0666); err != nil { |
|
Vadim Sh.
2016/04/11 16:52:03
use fs.EnsureFile
nodir
2016/04/11 22:16:16
Done.
|
| + return err |
| + } |
| + |
| + c.dirty = false |
| + return nil |
| +} |
| + |
| +func (c *InstanceCache) ensureEntries() { |
| + if c.state.Entries == nil { |
| + c.state.Entries = map[string]*messages.InstanceCache_Entry{} |
| + } |
| +} |
| + |
| +// touch updates/adds last access time for an instance. |
| +func (c *InstanceCache) touch(instanceID string, now time.Time) { |
| + entry := c.state.Entries[instanceID] |
| + if entry == nil { |
| + entry = &messages.InstanceCache_Entry{} |
| + c.ensureEntries() |
| + c.state.Entries[instanceID] = entry |
| + } |
| + if entry.LastAccess == nil { |
| + entry.LastAccess = &google.Timestamp{} |
| + } |
| + entry.LastAccess.FromTime(now) |
| +} |