OLD | NEW |
1 package filediffstore | 1 package filediffstore |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "crypto/md5" | 5 "crypto/md5" |
6 "encoding/base64" | 6 "encoding/base64" |
7 "encoding/json" | 7 "encoding/json" |
8 "fmt" | 8 "fmt" |
9 "image" | 9 "image" |
10 "image/png" | 10 "image/png" |
11 "io" | 11 "io" |
12 "io/ioutil" | 12 "io/ioutil" |
13 "net/http" | 13 "net/http" |
14 "os" | 14 "os" |
15 "path/filepath" | 15 "path/filepath" |
| 16 "strings" |
16 "sync" | 17 "sync" |
17 "time" | 18 "time" |
18 | 19 |
19 "github.com/boltdb/bolt" | 20 "github.com/boltdb/bolt" |
20 "github.com/hashicorp/golang-lru" | 21 "github.com/hashicorp/golang-lru" |
21 metrics "github.com/rcrowley/go-metrics" | 22 metrics "github.com/rcrowley/go-metrics" |
22 "github.com/skia-dev/glog" | 23 "github.com/skia-dev/glog" |
23 "go.skia.org/infra/go/fileutil" | 24 "go.skia.org/infra/go/fileutil" |
24 "go.skia.org/infra/go/gs" | 25 "go.skia.org/infra/go/gs" |
25 "go.skia.org/infra/go/util" | 26 "go.skia.org/infra/go/util" |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
210 | 211 |
211 return f.failureDB.Update(func(tx *bolt.Tx) error { | 212 return f.failureDB.Update(func(tx *bolt.Tx) error { |
212 bucket, err := tx.CreateBucketIfNotExists([]byte(FAILURE_BUCKET)
) | 213 bucket, err := tx.CreateBucketIfNotExists([]byte(FAILURE_BUCKET)
) |
213 if err != nil { | 214 if err != nil { |
214 return err | 215 return err |
215 } | 216 } |
216 return bucket.Put([]byte(failure.Digest), jsonData) | 217 return bucket.Put([]byte(failure.Digest), jsonData) |
217 }) | 218 }) |
218 } | 219 } |
219 | 220 |
| 221 func (f *FileDiffStore) purgeDigestFailures(digests []string) error { |
| 222 updated := false |
| 223 err := f.failureDB.Update(func(tx *bolt.Tx) error { |
| 224 bucket := tx.Bucket([]byte(FAILURE_BUCKET)) |
| 225 if bucket == nil { |
| 226 return nil |
| 227 } |
| 228 |
| 229 for _, d := range digests { |
| 230 if bucket.Get([]byte(d)) != nil { |
| 231 updated = true |
| 232 if err := bucket.Delete([]byte(d)); err != nil { |
| 233 return err |
| 234 } |
| 235 } |
| 236 } |
| 237 |
| 238 return nil |
| 239 }) |
| 240 |
| 241 if (err == nil) && updated { |
| 242 return f.loadDigestFailures() |
| 243 } |
| 244 return err |
| 245 } |
| 246 |
220 // loadDigestFailures loads all digest failures to | 247 // loadDigestFailures loads all digest failures to |
221 func (f *FileDiffStore) loadDigestFailures() error { | 248 func (f *FileDiffStore) loadDigestFailures() error { |
222 newFailures := make(map[string]*diff.DigestFailure, len(f.unavailableDig
ests)) | 249 newFailures := make(map[string]*diff.DigestFailure, len(f.unavailableDig
ests)) |
223 err := f.failureDB.View(func(tx *bolt.Tx) error { | 250 err := f.failureDB.View(func(tx *bolt.Tx) error { |
224 bucket := tx.Bucket([]byte(FAILURE_BUCKET)) | 251 bucket := tx.Bucket([]byte(FAILURE_BUCKET)) |
225 if bucket == nil { | 252 if bucket == nil { |
226 return nil | 253 return nil |
227 } | 254 } |
228 | 255 |
229 cursor := bucket.Cursor() | 256 cursor := bucket.Cursor() |
230 for k, v := cursor.First(); k != nil; k, v = cursor.Next() { | 257 for k, v := cursor.First(); k != nil; k, v = cursor.Next() { |
231 dFailure := &diff.DigestFailure{} | 258 dFailure := &diff.DigestFailure{} |
232 if err := json.Unmarshal(v, dFailure); err != nil { | 259 if err := json.Unmarshal(v, dFailure); err != nil { |
233 return err | 260 return err |
234 } | 261 } |
235 newFailures[string(k)] = dFailure | 262 newFailures[string(k)] = dFailure |
236 } | 263 } |
237 return nil | 264 return nil |
238 }) | 265 }) |
239 if err == nil { | 266 if err == nil { |
240 f.unavailableMutex.Lock() | 267 f.unavailableMutex.Lock() |
241 f.unavailableDigests = newFailures | 268 f.unavailableDigests = newFailures |
242 f.unavailableMutex.Unlock() | 269 f.unavailableMutex.Unlock() |
243 } | 270 } |
244 return err | 271 return err |
245 } | 272 } |
246 | 273 |
247 func (f *FileDiffStore) PurgeDigests(digests []string, purgeGS bool) { | 274 func (f *FileDiffStore) PurgeDigests(digests []string, purgeGS bool) error { |
248 » // TODO (stephana): To be implemented in next CL. | 275 » // Remove from GS if requested. |
| 276 » if purgeGS { |
| 277 » » for _, d := range digests { |
| 278 » » » if err := f.removeImageFromGS(d); err != nil { |
| 279 » » » » return err |
| 280 » » » } |
| 281 » » } |
| 282 » } |
| 283 |
| 284 » for _, d := range digests { |
| 285 » » if err := f.removeImageFromCache(d); err != nil { |
| 286 » » » return err |
| 287 » » } |
| 288 » } |
| 289 |
| 290 » // Remove from image cache. |
| 291 » for _, d := range digests { |
| 292 » » f.imageCache.Remove(d) |
| 293 » } |
| 294 |
| 295 » // Remove all metrics from disk cache. |
| 296 » if err := f.removeDiffMetricsFromFileCache(digests); err != nil { |
| 297 » » return err |
| 298 » } |
| 299 |
| 300 » // Remove all diff metrics from LRU cache. |
| 301 » for _, ki := range f.diffCache.Keys() { |
| 302 » » k := ki.(string) |
| 303 » » for _, d := range digests { |
| 304 » » » if strings.Contains(k, d) { |
| 305 » » » » f.diffCache.Remove(ki) |
| 306 » » » } |
| 307 » » } |
| 308 » } |
| 309 » return f.purgeDigestFailures(digests) |
249 } | 310 } |
250 | 311 |
251 type WorkerReq struct { | 312 type WorkerReq struct { |
252 id interface{} | 313 id interface{} |
253 respCh chan<- *WorkerResp | 314 respCh chan<- *WorkerResp |
254 } | 315 } |
255 | 316 |
256 type WorkerResp struct { | 317 type WorkerResp struct { |
257 id interface{} | 318 id interface{} |
258 val interface{} | 319 val interface{} |
(...skipping 259 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
518 d, err := json.MarshalIndent(diffMetrics, "", " ") | 579 d, err := json.MarshalIndent(diffMetrics, "", " ") |
519 if err != nil { | 580 if err != nil { |
520 return fmt.Errorf("Failed to encode to JSON: %s", err) | 581 return fmt.Errorf("Failed to encode to JSON: %s", err) |
521 } | 582 } |
522 if _, err := f.Write(d); err != nil { | 583 if _, err := f.Write(d); err != nil { |
523 return fmt.Errorf("Failed to write to file: %v", err) | 584 return fmt.Errorf("Failed to write to file: %v", err) |
524 } | 585 } |
525 return nil | 586 return nil |
526 } | 587 } |
527 | 588 |
| 589 func (fs *FileDiffStore) removeDiffMetricsFromFileCache(digests []string) error
{ |
| 590 fs.diffDirLock.Lock() |
| 591 defer fs.diffDirLock.Unlock() |
| 592 |
| 593 // Walk the entire cache and remove all files are contained in the list
of digests. |
| 594 return filepath.Walk(fs.localDiffMetricsDir, func(path string, info os.F
ileInfo, err error) error { |
| 595 if !info.IsDir() { |
| 596 for _, d := range digests { |
| 597 if (len(d) > 0) && strings.Contains(info.Name(),
d) { |
| 598 if err := os.Remove(path); err != nil { |
| 599 return err |
| 600 } |
| 601 } |
| 602 } |
| 603 } |
| 604 return nil |
| 605 }) |
| 606 } |
| 607 |
528 // Returns the file basename to use for the specified digests. | 608 // Returns the file basename to use for the specified digests. |
529 // Eg: Returns 111-222 since 111 < 222 when 111 and 222 are specified as inputs | 609 // Eg: Returns 111-222 since 111 < 222 when 111 and 222 are specified as inputs |
530 // regardless of the order. | 610 // regardless of the order. |
531 func getDiffBasename(d1, d2 string) string { | 611 func getDiffBasename(d1, d2 string) string { |
532 if d1 < d2 { | 612 if d1 < d2 { |
533 return fmt.Sprintf("%s-%s", d1, d2) | 613 return fmt.Sprintf("%s-%s", d1, d2) |
534 } | 614 } |
535 return fmt.Sprintf("%s-%s", d2, d1) | 615 return fmt.Sprintf("%s-%s", d2, d1) |
536 } | 616 } |
537 | 617 |
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
689 glog.Errorf("Error fetching file for digest %s: %s", d, err) | 769 glog.Errorf("Error fetching file for digest %s: %s", d, err) |
690 } | 770 } |
691 | 771 |
692 if err != nil { | 772 if err != nil { |
693 glog.Errorf("Failed fetching file after %d attempts", MAX_URI_GE
T_TRIES) | 773 glog.Errorf("Failed fetching file after %d attempts", MAX_URI_GE
T_TRIES) |
694 downloadFailureCount.Inc(1) | 774 downloadFailureCount.Inc(1) |
695 } | 775 } |
696 return err | 776 return err |
697 } | 777 } |
698 | 778 |
| 779 func (fs *FileDiffStore) removeImageFromGS(d string) error { |
| 780 storage, err := storage.New(fs.client) |
| 781 if err != nil { |
| 782 return fmt.Errorf("Failed to create interface to Google Storage:
%s\n", err) |
| 783 } |
| 784 |
| 785 objLocation := filepath.Join(fs.storageBaseDir, fmt.Sprintf("%s.%s", d,
IMG_EXTENSION)) |
| 786 if err := storage.Objects.Delete(fs.gsBucketName, objLocation).Do(); err
!= nil { |
| 787 return fmt.Errorf("Unable to delete %s/%s: %s", fs.gsBucketName
, objLocation, err) |
| 788 } |
| 789 return nil |
| 790 } |
| 791 |
| 792 func (fs *FileDiffStore) removeImageFromCache(d string) error { |
| 793 fs.digestDirLock.Lock() |
| 794 defer fs.digestDirLock.Unlock() |
| 795 path := fs.getDigestImagePath(d) |
| 796 if _, err := os.Stat(path); os.IsNotExist(err) { |
| 797 return nil |
| 798 } |
| 799 return os.Remove(path) |
| 800 } |
| 801 |
699 // Returns the response body of the specified GS object. Tries MAX_URI_GET_TRIES | 802 // Returns the response body of the specified GS object. Tries MAX_URI_GET_TRIES |
700 // times if download is unsuccessful. Client must close the response body when | 803 // times if download is unsuccessful. Client must close the response body when |
701 // finished with it. | 804 // finished with it. |
702 func (fs *FileDiffStore) getRespBody(res *storage.Object) (io.ReadCloser, error)
{ | 805 func (fs *FileDiffStore) getRespBody(res *storage.Object) (io.ReadCloser, error)
{ |
703 request, err := gs.RequestForStorageURL(res.MediaLink) | 806 request, err := gs.RequestForStorageURL(res.MediaLink) |
704 if err != nil { | 807 if err != nil { |
705 return nil, fmt.Errorf("Unable to create Storage MediaURI reques
t: %s\n", err) | 808 return nil, fmt.Errorf("Unable to create Storage MediaURI reques
t: %s\n", err) |
706 } | 809 } |
707 | 810 |
708 resp, err := fs.client.Do(request) | 811 resp, err := fs.client.Do(request) |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
818 | 921 |
819 func (fs *FileDiffStore) createRadixPath(baseDir, fileName string) (string, erro
r) { | 922 func (fs *FileDiffStore) createRadixPath(baseDir, fileName string) (string, erro
r) { |
820 targetPath := fileutil.TwoLevelRadixPath(baseDir, fileName) | 923 targetPath := fileutil.TwoLevelRadixPath(baseDir, fileName) |
821 radixDir, _ := filepath.Split(targetPath) | 924 radixDir, _ := filepath.Split(targetPath) |
822 if err := os.MkdirAll(radixDir, 0700); err != nil { | 925 if err := os.MkdirAll(radixDir, 0700); err != nil { |
823 return "", err | 926 return "", err |
824 } | 927 } |
825 | 928 |
826 return targetPath, nil | 929 return targetPath, nil |
827 } | 930 } |
OLD | NEW |