OLD | NEW |
1 // Package gs implements utility for accessing Skia perf data in Google Storage. | 1 // Package gs implements utility for accessing Skia perf data in Google Storage. |
2 package gs | 2 package gs |
3 | 3 |
4 import ( | 4 import ( |
5 "fmt" | 5 "fmt" |
6 "io/ioutil" | 6 "io/ioutil" |
7 "net/http" | 7 "net/http" |
8 "regexp" | 8 "regexp" |
9 "strings" | 9 "strings" |
| 10 "sync" |
| 11 "sync/atomic" |
10 "time" | 12 "time" |
11 | 13 |
12 "github.com/skia-dev/glog" | 14 "github.com/skia-dev/glog" |
13 "go.skia.org/infra/go/util" | 15 "go.skia.org/infra/go/util" |
14 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
15 "google.golang.org/cloud/storage" | 17 "google.golang.org/cloud/storage" |
16 ) | 18 ) |
17 | 19 |
18 var ( | 20 var ( |
19 // dirMap maps dataset name to a slice with Google Storage subdirectory
and file prefix. | 21 // dirMap maps dataset name to a slice with Google Storage subdirectory
and file prefix. |
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
138 total += len(list.Results) | 140 total += len(list.Results) |
139 glog.Infof("Loading %d more files from gs://%s/%s Total: %d", l
en(list.Results), bucket, folder, total) | 141 glog.Infof("Loading %d more files from gs://%s/%s Total: %d", l
en(list.Results), bucket, folder, total) |
140 for _, item := range list.Results { | 142 for _, item := range list.Results { |
141 callback(item) | 143 callback(item) |
142 } | 144 } |
143 q = list.Next | 145 q = list.Next |
144 } | 146 } |
145 return nil | 147 return nil |
146 } | 148 } |
147 | 149 |
148 func DeleteAllFilesInDir(s *storage.Client, bucket, folder string) error { | 150 // DeleteAllFilesInDir deletes all the files in a given folder. If processes is
set to > 1, |
149 » var deleteError bool | 151 // that many go routines will be spun up to delete the file simultaneously. |
| 152 func DeleteAllFilesInDir(s *storage.Client, bucket, folder string, processes int
) error { |
| 153 » errCount := int32(0) |
| 154 » var wg sync.WaitGroup |
| 155 » toDelete := make(chan string, 1000) |
| 156 » for i := 0; i < processes; i++ { |
| 157 » » go deleteHelper(s, bucket, &wg, toDelete, &errCount) |
| 158 » } |
150 del := func(item *storage.ObjectAttrs) { | 159 del := func(item *storage.ObjectAttrs) { |
151 » » if err := s.Bucket(bucket).Object(item.Name).Delete(context.Back
ground()); err != nil { | 160 » » toDelete <- item.Name |
152 » » » glog.Errorf("Problem deleting gs://%s/%s: %s", bucket, i
tem.Name, err) | |
153 » » » deleteError = true | |
154 » » } | |
155 } | 161 } |
156 if err := AllFilesInDir(s, bucket, folder, del); err != nil { | 162 if err := AllFilesInDir(s, bucket, folder, del); err != nil { |
157 return err | 163 return err |
158 } | 164 } |
159 » if deleteError { | 165 » close(toDelete) |
| 166 » wg.Wait() |
| 167 » if errCount > 0 { |
160 return fmt.Errorf("There were one or more problems when deleting
files in folder %q", folder) | 168 return fmt.Errorf("There were one or more problems when deleting
files in folder %q", folder) |
161 } | 169 } |
162 return nil | 170 return nil |
163 | 171 |
164 } | 172 } |
| 173 |
| 174 // deleteHelper spins and waits for work to come in on the toDelete channel. Wh
en it does, it |
| 175 // uses the storage client to delete the file from the given bucket. |
| 176 func deleteHelper(s *storage.Client, bucket string, wg *sync.WaitGroup, toDelete
<-chan string, errCount *int32) { |
| 177 wg.Add(1) |
| 178 defer wg.Done() |
| 179 for file := range toDelete { |
| 180 if err := s.Bucket(bucket).Object(file).Delete(context.Backgroun
d()); err != nil { |
| 181 glog.Errorf("Problem deleting gs://%s/%s: %s", bucket, f
ile, err) |
| 182 atomic.AddInt32(errCount, 1) |
| 183 } |
| 184 } |
| 185 } |
OLD | NEW |