Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package internal | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "container/heap" | |
| 10 "fmt" | |
| 11 "io" | |
| 12 "io/ioutil" | |
| 13 "math/rand" | |
| 14 "os" | |
| 15 "sync" | |
| 16 "time" | |
| 17 | |
| 18 "github.com/luci/luci-go/common/logging" | |
| 19 "github.com/luci/luci-go/common/proto/google" | |
| 20 "github.com/luci/luci-go/common/stringset" | |
| 21 | |
| 22 "github.com/luci/luci-go/client/cipd/common" | |
| 23 "github.com/luci/luci-go/client/cipd/internal/messages" | |
| 24 "github.com/luci/luci-go/client/cipd/local" | |
| 25 ) | |
| 26 | |
| 27 const ( | |
| 28 instanceCacheMaxSize = 300 | |
| 29 // instanceCacheSyncInterval determines the frequency of | |
| 30 // synchronization of state.db with instance files in the cache dir. | |
| 31 instanceCacheSyncInterval = 8 * time.Hour | |
| 32 instanceCacheStateFilename = "state.db" | |
| 33 ) | |
| 34 | |
| 35 // InstanceCache is a file-system-based, thread-safe, LRU cache of instances. | |
| 36 // | |
| 37 // Does not validate instance hashes; it is caller's responsibility. | |
| 38 type InstanceCache struct { | |
| 39 fs local.FileSystem | |
| 40 stateLock sync.Mutex // synchronizes access to the state file. | |
| 41 logger logging.Logger | |
| 42 } | |
| 43 | |
| 44 // NewInstanceCache initializes InstanceCache. | |
| 45 // fs will be the root of the cache. | |
| 46 func NewInstanceCache(fs local.FileSystem, logger logging.Logger) *InstanceCache { | |
| 47 if logger == nil { | |
| 48 logger = logging.Null() | |
| 49 } | |
| 50 return &InstanceCache{ | |
| 51 fs: fs, | |
| 52 logger: logger, | |
| 53 } | |
| 54 } | |
| 55 | |
| 56 // Get searches for the instance in the cache and writes its contents to output. | |
| 57 // If the instance is not found, returns an os.IsNotExists error without writing | |
| 58 // to output. | |
| 59 func (c *InstanceCache) Get(pin common.Pin, output io.Writer, now time.Time) err or { | |
| 60 if err := common.ValidatePin(pin); err != nil { | |
| 61 return err | |
| 62 } | |
| 63 | |
| 64 path, err := c.fs.RootRelToAbs(pin.InstanceID) | |
| 65 if err != nil { | |
| 66 return fmt.Errorf("invalid instance ID %q", pin.InstanceID) | |
| 67 } | |
| 68 | |
| 69 f, err := os.Open(path) | |
| 70 if err != nil { | |
| 71 return err | |
| 72 } | |
| 73 defer f.Close() | |
| 74 | |
| 75 c.withState(now, func(s *messages.InstanceCache) { | |
| 76 touch(s, pin.InstanceID, now) | |
| 77 }) | |
| 78 | |
| 79 _, err = io.Copy(output, f) | |
| 80 return err | |
| 81 } | |
| 82 | |
| 83 // Put caches an instance. | |
| 84 // write must write the instance contents. | |
| 85 // May remove some instances from the cache that were not accessed for a long ti me. | |
| 86 func (c *InstanceCache) Put(pin common.Pin, now time.Time, write func(*os.File) error) error { | |
|
Vadim Sh.
2016/04/13 21:49:02
btw, why *os.File and not io.WriteSeeker?
nodir
2016/04/13 23:23:52
because *os.File is what Put provides.
In general
| |
| 87 if err := common.ValidatePin(pin); err != nil { | |
| 88 return err | |
| 89 } | |
| 90 path, err := c.fs.RootRelToAbs(pin.InstanceID) | |
| 91 if err != nil { | |
| 92 return fmt.Errorf("invalid instance ID %q", pin.InstanceID) | |
| 93 } | |
| 94 | |
| 95 if err := c.fs.EnsureFile(path, write); err != nil { | |
| 96 return err | |
| 97 } | |
| 98 | |
| 99 c.withState(now, func(s *messages.InstanceCache) { | |
| 100 touch(s, pin.InstanceID, now) | |
| 101 c.gc(s) | |
| 102 }) | |
| 103 return nil | |
| 104 } | |
| 105 | |
| 106 type timeHeap []time.Time | |
| 107 | |
| 108 func (h timeHeap) Len() int { return len(h) } | |
| 109 func (h timeHeap) Less(i, j int) bool { return h[i].Before(h[j]) } | |
| 110 func (h timeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } | |
| 111 func (h *timeHeap) Push(x interface{}) { | |
| 112 *h = append(*h, x.(time.Time)) | |
| 113 } | |
| 114 func (h *timeHeap) Pop() interface{} { | |
| 115 old := *h | |
| 116 n := len(old) | |
| 117 x := old[n-1] | |
| 118 *h = old[0 : n-1] | |
| 119 return x | |
| 120 } | |
| 121 | |
| 122 // gc checks if the number of instances in the state is greater than maximum. | |
| 123 // If yes, purges excessive oldest instances. | |
| 124 func (c *InstanceCache) gc(state *messages.InstanceCache) { | |
| 125 garbageSize := len(state.Entries) - instanceCacheMaxSize | |
| 126 if garbageSize <= 0 { | |
| 127 return | |
| 128 } | |
| 129 | |
| 130 // Compute cutoff date by putting all access times to a heap | |
| 131 // and pop from it garbageSize times. | |
| 132 lastAccessTimes := make(timeHeap, 0, len(state.Entries)) | |
| 133 for _, s := range state.Entries { | |
| 134 lastAccessTimes = append(lastAccessTimes, s.LastAccess.Time()) | |
| 135 } | |
| 136 heap.Init(&lastAccessTimes) | |
| 137 for i := 0; i < garbageSize-1; i++ { | |
| 138 heap.Pop(&lastAccessTimes) | |
| 139 } | |
| 140 cutOff := heap.Pop(&lastAccessTimes).(time.Time) | |
| 141 | |
| 142 // First garbageSize instances that were last accessed on or before cutO ff are garbage. | |
| 143 garbage := make([]string, 0, garbageSize) | |
| 144 // Map iteration is not deterministic, but it is fine. | |
| 145 for id, e := range state.Entries { | |
| 146 if !e.LastAccess.Time().After(cutOff) { | |
| 147 garbage = append(garbage, id) | |
| 148 if len(garbage) == cap(garbage) { | |
| 149 break | |
| 150 } | |
| 151 } | |
| 152 } | |
| 153 | |
| 154 collected := 0 | |
| 155 for _, id := range garbage { | |
| 156 path, err := c.fs.RootRelToAbs(id) | |
| 157 if err != nil { | |
| 158 panic("impossible") | |
| 159 } | |
| 160 if c.fs.EnsureFileGone(path) != nil { | |
| 161 // EnsureFileGone logs errors. | |
| 162 continue | |
| 163 } | |
| 164 delete(state.Entries, id) | |
| 165 collected++ | |
| 166 } | |
| 167 c.logger.Infof("cipd: instance cache collected %d instances", collected) | |
| 168 } | |
| 169 | |
| 170 // readState loads cache state from the state file. | |
| 171 // If the file does not exist, corrupted or its state was not synchronized | |
| 172 // with the instance files for a long time, synchronizes it. | |
| 173 // Newly discovered files are considered last accessed at zero time. | |
| 174 // If synchronization fails, then the state is considered empty. | |
| 175 func (c *InstanceCache) readState(state *messages.InstanceCache, now time.Time) { | |
| 176 statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename) | |
| 177 if err != nil { | |
| 178 panic("impossible") | |
| 179 } | |
| 180 | |
| 181 stateBytes, err := ioutil.ReadFile(statePath) | |
| 182 sync := false | |
| 183 switch { | |
| 184 case os.IsNotExist(err): | |
| 185 sync = true | |
| 186 | |
| 187 case err != nil: | |
| 188 c.logger.Warningf("cipd: could not read instance cache - %s", er r) | |
| 189 sync = true | |
| 190 | |
| 191 default: | |
| 192 if err := UnmarshalWithSHA1(stateBytes, state); err != nil { | |
| 193 c.logger.Warningf("cipd: instance cache file is corrupte d - %s", err) | |
| 194 *state = messages.InstanceCache{} | |
| 195 sync = true | |
| 196 } else { | |
| 197 cutOff := now. | |
| 198 Add(-instanceCacheSyncInterval). | |
| 199 Add(time.Duration(rand.Int63n(int64(5 * time.Min ute)))) | |
| 200 sync = state.LastSynced.Time().Before(cutOff) | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 if sync { | |
| 205 if err := c.syncState(state, now); err != nil { | |
| 206 c.logger.Warningf("cipd: failed to sync instance cache - %s", err) | |
| 207 } | |
| 208 c.gc(state) | |
| 209 } | |
| 210 } | |
| 211 | |
| 212 // syncState synchronizes the list of instances in the state file with instance files. | |
| 213 // Preserves lastAccess of existing instances. Newly discovered files are | |
| 214 // considered last accessed at zero time. | |
| 215 func (c *InstanceCache) syncState(state *messages.InstanceCache, now time.Time) error { | |
| 216 root, err := os.Open(c.fs.Root()) | |
| 217 switch { | |
| 218 | |
| 219 case os.IsNotExist(err): | |
| 220 state.Entries = nil | |
| 221 | |
| 222 case err != nil: | |
| 223 return err | |
| 224 | |
| 225 default: | |
| 226 instanceIDs, err := root.Readdirnames(0) | |
| 227 if err != nil { | |
| 228 return err | |
| 229 } | |
| 230 existingIDs := stringset.New(len(instanceIDs)) | |
| 231 for _, id := range instanceIDs { | |
| 232 if common.ValidateInstanceID(id) != nil { | |
| 233 continue | |
| 234 } | |
| 235 existingIDs.Add(id) | |
| 236 | |
| 237 if _, ok := state.Entries[id]; !ok { | |
| 238 if state.Entries == nil { | |
| 239 state.Entries = map[string]*messages.Ins tanceCache_Entry{} | |
| 240 } | |
| 241 state.Entries[id] = &messages.InstanceCache_Entr y{} | |
| 242 } | |
| 243 } | |
| 244 | |
| 245 for id := range state.Entries { | |
| 246 if !existingIDs.Has(id) { | |
| 247 delete(state.Entries, id) | |
| 248 } | |
| 249 } | |
| 250 } | |
| 251 | |
| 252 state.LastSynced = google.NewTimestamp(now) | |
| 253 c.logger.Infof("cipd: synchronized instance cache with instance files") | |
| 254 return nil | |
| 255 } | |
| 256 | |
| 257 // saveState persists the cache state. | |
| 258 func (c *InstanceCache) saveState(state *messages.InstanceCache) error { | |
| 259 stateBytes, err := MarshalWithSHA1(state) | |
| 260 if err != nil { | |
| 261 return err | |
| 262 } | |
| 263 | |
| 264 statePath, err := c.fs.RootRelToAbs(instanceCacheStateFilename) | |
| 265 if err != nil { | |
| 266 panic("impossible") | |
| 267 } | |
| 268 | |
| 269 return local.EnsureFile(c.fs, statePath, bytes.NewReader(stateBytes)) | |
|
Vadim Sh.
2016/04/13 21:49:02
we will need to teach EnsureFile to retry access e
nodir
2016/04/13 23:23:52
yeah, that's what I thought when I saw your CL
| |
| 270 } | |
| 271 | |
| 272 // withState loads cache state from the state file, calls f and saves it back. | |
| 273 // See also readState. | |
| 274 func (c *InstanceCache) withState(now time.Time, f func(*messages.InstanceCache) ) { | |
| 275 c.stateLock.Lock() | |
| 276 defer c.stateLock.Unlock() | |
| 277 | |
| 278 state := &messages.InstanceCache{} | |
| 279 | |
| 280 start := time.Now() | |
| 281 c.readState(state, now) | |
| 282 loadSaveTime := time.Since(start) | |
| 283 | |
| 284 f(state) | |
| 285 | |
| 286 start = time.Now() | |
| 287 if err := c.saveState(state); err != nil { | |
| 288 c.logger.Warningf("cipd: could not save instance cache - %s", er r) | |
| 289 } | |
| 290 loadSaveTime += time.Since(start) | |
| 291 | |
| 292 if loadSaveTime > time.Second { | |
| 293 c.logger.Warningf("cipd: loading and saving instance cache with %d entries took %s", len(state.Entries), loadSaveTime) | |
| 294 } | |
| 295 } | |
| 296 | |
| 297 // getAccessTime returns last access time of an instance. | |
| 298 // Used for testing. | |
| 299 func (c *InstanceCache) getAccessTime(now time.Time, pin common.Pin) (lastAccess time.Time, ok bool) { | |
| 300 c.withState(now, func(s *messages.InstanceCache) { | |
| 301 var entry *messages.InstanceCache_Entry | |
| 302 if entry, ok = s.Entries[pin.InstanceID]; ok { | |
| 303 lastAccess = entry.LastAccess.Time() | |
| 304 } | |
| 305 }) | |
| 306 return | |
| 307 } | |
| 308 | |
| 309 // touch updates/adds last access time for an instance. | |
| 310 func touch(state *messages.InstanceCache, instanceID string, now time.Time) { | |
| 311 entry := state.Entries[instanceID] | |
| 312 if entry == nil { | |
| 313 entry = &messages.InstanceCache_Entry{} | |
| 314 if state.Entries == nil { | |
| 315 state.Entries = map[string]*messages.InstanceCache_Entry {} | |
| 316 } | |
| 317 state.Entries[instanceID] = entry | |
| 318 } | |
| 319 entry.LastAccess = google.NewTimestamp(now) | |
| 320 } | |
| OLD | NEW |