OLD | NEW |
---|---|
1 package filediffstore | 1 package filediffstore |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "crypto/md5" | 5 "crypto/md5" |
6 "encoding/base64" | 6 "encoding/base64" |
7 "encoding/json" | 7 "encoding/json" |
8 "fmt" | 8 "fmt" |
9 "image" | 9 "image" |
10 "image/png" | 10 "image/png" |
11 "io" | 11 "io" |
12 "io/ioutil" | 12 "io/ioutil" |
13 "net/http" | 13 "net/http" |
14 "os" | 14 "os" |
15 "path/filepath" | 15 "path/filepath" |
16 "strings" | |
16 "sync" | 17 "sync" |
17 "time" | 18 "time" |
18 | 19 |
19 "github.com/boltdb/bolt" | 20 "github.com/boltdb/bolt" |
20 "github.com/hashicorp/golang-lru" | 21 "github.com/hashicorp/golang-lru" |
21 metrics "github.com/rcrowley/go-metrics" | 22 metrics "github.com/rcrowley/go-metrics" |
22 "github.com/skia-dev/glog" | 23 "github.com/skia-dev/glog" |
23 "go.skia.org/infra/go/fileutil" | 24 "go.skia.org/infra/go/fileutil" |
24 "go.skia.org/infra/go/gs" | 25 "go.skia.org/infra/go/gs" |
25 "go.skia.org/infra/go/util" | 26 "go.skia.org/infra/go/util" |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
210 | 211 |
211 return f.failureDB.Update(func(tx *bolt.Tx) error { | 212 return f.failureDB.Update(func(tx *bolt.Tx) error { |
212 bucket, err := tx.CreateBucketIfNotExists([]byte(FAILURE_BUCKET) ) | 213 bucket, err := tx.CreateBucketIfNotExists([]byte(FAILURE_BUCKET) ) |
213 if err != nil { | 214 if err != nil { |
214 return err | 215 return err |
215 } | 216 } |
216 return bucket.Put([]byte(failure.Digest), jsonData) | 217 return bucket.Put([]byte(failure.Digest), jsonData) |
217 }) | 218 }) |
218 } | 219 } |
219 | 220 |
221 func (f *FileDiffStore) purgeDigestFailures(digests []string) error { | |
222 updated := false | |
223 err := f.failureDB.Update(func(tx *bolt.Tx) error { | |
224 bucket := tx.Bucket([]byte(FAILURE_BUCKET)) | |
225 if bucket == nil { | |
226 return nil | |
227 } | |
228 | |
229 for _, d := range digests { | |
230 if bucket.Get([]byte(d)) != nil { | |
231 updated = true | |
232 if err := bucket.Delete([]byte(d)); err != nil { | |
233 return err | |
234 } | |
235 } | |
236 } | |
237 | |
238 return nil | |
239 }) | |
240 | |
241 if (err == nil) && updated { | |
242 return f.loadDigestFailures() | |
243 } | |
244 return err | |
245 } | |
246 | |
220 // loadDigestFailures loads all digest failures to | 247 // loadDigestFailures loads all digest failures to |
221 func (f *FileDiffStore) loadDigestFailures() error { | 248 func (f *FileDiffStore) loadDigestFailures() error { |
222 newFailures := make(map[string]*diff.DigestFailure, len(f.unavailableDig ests)) | 249 newFailures := make(map[string]*diff.DigestFailure, len(f.unavailableDig ests)) |
223 err := f.failureDB.View(func(tx *bolt.Tx) error { | 250 err := f.failureDB.View(func(tx *bolt.Tx) error { |
224 bucket := tx.Bucket([]byte(FAILURE_BUCKET)) | 251 bucket := tx.Bucket([]byte(FAILURE_BUCKET)) |
225 if bucket == nil { | 252 if bucket == nil { |
226 return nil | 253 return nil |
227 } | 254 } |
228 | 255 |
229 cursor := bucket.Cursor() | 256 cursor := bucket.Cursor() |
230 for k, v := cursor.First(); k != nil; k, v = cursor.Next() { | 257 for k, v := cursor.First(); k != nil; k, v = cursor.Next() { |
231 dFailure := &diff.DigestFailure{} | 258 dFailure := &diff.DigestFailure{} |
232 if err := json.Unmarshal(v, dFailure); err != nil { | 259 if err := json.Unmarshal(v, dFailure); err != nil { |
233 return err | 260 return err |
234 } | 261 } |
235 newFailures[string(k)] = dFailure | 262 newFailures[string(k)] = dFailure |
236 } | 263 } |
237 return nil | 264 return nil |
238 }) | 265 }) |
239 if err == nil { | 266 if err == nil { |
240 f.unavailableMutex.Lock() | 267 f.unavailableMutex.Lock() |
241 f.unavailableDigests = newFailures | 268 f.unavailableDigests = newFailures |
242 f.unavailableMutex.Unlock() | 269 f.unavailableMutex.Unlock() |
243 } | 270 } |
244 return err | 271 return err |
245 } | 272 } |
246 | 273 |
247 func (f *FileDiffStore) PurgeDigests(digests []string, purgeGS bool) { | 274 func (f *FileDiffStore) PurgeDigests(digests []string, purgeGS bool) error { |
jcgregorio
2015/10/13 18:40:24
Needs unit tests.
stephana
2015/10/14 14:26:30
Done.
Note: The removal from GS and the list of u
| |
248 » // TODO (stephana): To be implemented in next CL. | 275 » // Remove from GS if requested. |
276 » if purgeGS { | |
277 » » for _, d := range digests { | |
278 » » » if err := f.removeImageFromGS(d); err != nil { | |
279 » » » » return err | |
280 » » » } | |
281 » » } | |
282 » } | |
283 | |
284 » for _, d := range digests { | |
285 » » if err := f.removeImageFromCache(d); err != nil { | |
286 » » » return err | |
287 » » } | |
288 » } | |
289 | |
290 » // Remove from image cache. | |
291 » for _, d := range digests { | |
292 » » f.imageCache.Remove(d) | |
293 » } | |
294 | |
295 » // Remove all metrics from disk cache. | |
296 » if err := f.removeDiffMetricsFromFileCache(digests); err != nil { | |
297 » » return err | |
298 » } | |
299 | |
300 » // Remove all diff metrics from LRU cache. | |
301 » for _, ki := range f.diffCache.Keys() { | |
302 » » k := ki.(string) | |
303 » » for _, d := range digests { | |
304 » » » if strings.Contains(k, d) { | |
305 » » » » f.diffCache.Remove(ki) | |
306 » » » } | |
307 » » } | |
308 » } | |
309 » return f.purgeDigestFailures(digests) | |
249 } | 310 } |
250 | 311 |
251 type WorkerReq struct { | 312 type WorkerReq struct { |
252 id interface{} | 313 id interface{} |
253 respCh chan<- *WorkerResp | 314 respCh chan<- *WorkerResp |
254 } | 315 } |
255 | 316 |
256 type WorkerResp struct { | 317 type WorkerResp struct { |
257 id interface{} | 318 id interface{} |
258 val interface{} | 319 val interface{} |
(...skipping 259 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
518 d, err := json.MarshalIndent(diffMetrics, "", " ") | 579 d, err := json.MarshalIndent(diffMetrics, "", " ") |
519 if err != nil { | 580 if err != nil { |
520 return fmt.Errorf("Failed to encode to JSON: %s", err) | 581 return fmt.Errorf("Failed to encode to JSON: %s", err) |
521 } | 582 } |
522 if _, err := f.Write(d); err != nil { | 583 if _, err := f.Write(d); err != nil { |
523 return fmt.Errorf("Failed to write to file: %v", err) | 584 return fmt.Errorf("Failed to write to file: %v", err) |
524 } | 585 } |
525 return nil | 586 return nil |
526 } | 587 } |
527 | 588 |
589 func (fs *FileDiffStore) removeDiffMetricsFromFileCache(digests []string) error { | |
590 fs.diffDirLock.Lock() | |
591 defer fs.diffDirLock.Unlock() | |
592 | |
593 // Walk the entire cache and remove all files are contained in the list of digests. | |
594 return filepath.Walk(fs.localDiffMetricsDir, func(path string, info os.F ileInfo, err error) error { | |
595 if !info.IsDir() { | |
596 for _, d := range digests { | |
597 if (len(d) > 0) && strings.Contains(info.Name(), d) { | |
598 if err := os.Remove(path); err != nil { | |
599 return err | |
600 } | |
601 } | |
602 } | |
603 } | |
604 return nil | |
605 }) | |
606 } | |
607 | |
528 // Returns the file basename to use for the specified digests. | 608 // Returns the file basename to use for the specified digests. |
529 // Eg: Returns 111-222 since 111 < 222 when 111 and 222 are specified as inputs | 609 // Eg: Returns 111-222 since 111 < 222 when 111 and 222 are specified as inputs |
530 // regardless of the order. | 610 // regardless of the order. |
531 func getDiffBasename(d1, d2 string) string { | 611 func getDiffBasename(d1, d2 string) string { |
532 if d1 < d2 { | 612 if d1 < d2 { |
533 return fmt.Sprintf("%s-%s", d1, d2) | 613 return fmt.Sprintf("%s-%s", d1, d2) |
534 } | 614 } |
535 return fmt.Sprintf("%s-%s", d2, d1) | 615 return fmt.Sprintf("%s-%s", d2, d1) |
536 } | 616 } |
537 | 617 |
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
689 glog.Errorf("Error fetching file for digest %s: %s", d, err) | 769 glog.Errorf("Error fetching file for digest %s: %s", d, err) |
690 } | 770 } |
691 | 771 |
692 if err != nil { | 772 if err != nil { |
693 glog.Errorf("Failed fetching file after %d attempts", MAX_URI_GE T_TRIES) | 773 glog.Errorf("Failed fetching file after %d attempts", MAX_URI_GE T_TRIES) |
694 downloadFailureCount.Inc(1) | 774 downloadFailureCount.Inc(1) |
695 } | 775 } |
696 return err | 776 return err |
697 } | 777 } |
698 | 778 |
779 func (fs *FileDiffStore) removeImageFromGS(d string) error { | |
780 storage, err := storage.New(fs.client) | |
781 if err != nil { | |
782 return fmt.Errorf("Failed to create interface to Google Storage: %s\n", err) | |
783 } | |
784 | |
785 objLocation := filepath.Join(fs.storageBaseDir, fmt.Sprintf("%s.%s", d, IMG_EXTENSION)) | |
786 if err := storage.Objects.Delete(fs.gsBucketName, objLocation).Do(); err != nil { | |
787 return fmt.Errorf("Unable to delete %s/%s: %s", fs.gsBucketName , objLocation, err) | |
788 } | |
789 return nil | |
790 } | |
791 | |
792 func (fs *FileDiffStore) removeImageFromCache(d string) error { | |
793 fs.digestDirLock.Lock() | |
794 defer fs.digestDirLock.Unlock() | |
795 path := fs.getDigestImagePath(d) | |
796 if _, err := os.Stat(path); os.IsNotExist(err) { | |
797 return nil | |
798 } | |
799 return os.Remove(path) | |
800 } | |
801 | |
699 // Returns the response body of the specified GS object. Tries MAX_URI_GET_TRIES | 802 // Returns the response body of the specified GS object. Tries MAX_URI_GET_TRIES |
700 // times if download is unsuccessful. Client must close the response body when | 803 // times if download is unsuccessful. Client must close the response body when |
701 // finished with it. | 804 // finished with it. |
702 func (fs *FileDiffStore) getRespBody(res *storage.Object) (io.ReadCloser, error) { | 805 func (fs *FileDiffStore) getRespBody(res *storage.Object) (io.ReadCloser, error) { |
703 request, err := gs.RequestForStorageURL(res.MediaLink) | 806 request, err := gs.RequestForStorageURL(res.MediaLink) |
704 if err != nil { | 807 if err != nil { |
705 return nil, fmt.Errorf("Unable to create Storage MediaURI reques t: %s\n", err) | 808 return nil, fmt.Errorf("Unable to create Storage MediaURI reques t: %s\n", err) |
706 } | 809 } |
707 | 810 |
708 resp, err := fs.client.Do(request) | 811 resp, err := fs.client.Do(request) |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
818 | 921 |
819 func (fs *FileDiffStore) createRadixPath(baseDir, fileName string) (string, erro r) { | 922 func (fs *FileDiffStore) createRadixPath(baseDir, fileName string) (string, erro r) { |
820 targetPath := fileutil.TwoLevelRadixPath(baseDir, fileName) | 923 targetPath := fileutil.TwoLevelRadixPath(baseDir, fileName) |
821 radixDir, _ := filepath.Split(targetPath) | 924 radixDir, _ := filepath.Split(targetPath) |
822 if err := os.MkdirAll(radixDir, 0700); err != nil { | 925 if err := os.MkdirAll(radixDir, 0700); err != nil { |
823 return "", err | 926 return "", err |
824 } | 927 } |
825 | 928 |
826 return targetPath, nil | 929 return targetPath, nil |
827 } | 930 } |
OLD | NEW |