OLD | NEW |
(Empty) | |
| 1 // Google Storage utility that contains methods for both CT master and worker |
| 2 // scripts. |
| 3 package util |
| 4 |
| 5 import ( |
| 6 "fmt" |
| 7 "io" |
| 8 "io/ioutil" |
| 9 "net/http" |
| 10 "os" |
| 11 "path/filepath" |
| 12 "strings" |
| 13 "sync" |
| 14 |
| 15 "github.com/golang/glog" |
| 16 |
| 17 "code.google.com/p/google-api-go-client/storage/v1" |
| 18 "skia.googlesource.com/buildbot.git/go/auth" |
| 19 "skia.googlesource.com/buildbot.git/go/gs" |
| 20 ) |
| 21 |
| 22 type GsUtil struct { |
| 23 // The client used to connect to Google Storage. |
| 24 client *http.Client |
| 25 service *storage.Service |
| 26 } |
| 27 |
| 28 // NewGsUtil initializes and returns a utility for CT interations with Google |
| 29 // Storage. If client is nil then auth.RunFlow is invoked. if client is nil then |
| 30 // the client from GetOAuthClient is used. |
| 31 func NewGsUtil(client *http.Client) (*GsUtil, error) { |
| 32 if client == nil { |
| 33 oauthClient, err := GetOAuthClient() |
| 34 if err != nil { |
| 35 return nil, err |
| 36 } |
| 37 client = oauthClient |
| 38 } |
| 39 service, err := storage.New(client) |
| 40 if err != nil { |
| 41 return nil, fmt.Errorf("Failed to create interface to Google Sto
rage: %s", err) |
| 42 } |
| 43 return &GsUtil{client: client, service: service}, nil |
| 44 } |
| 45 |
| 46 func GetOAuthClient() (*http.Client, error) { |
| 47 config := auth.OAuthConfig(GSTokenPath, auth.SCOPE_READ_WRITE) |
| 48 return auth.RunFlow(config) |
| 49 } |
| 50 |
| 51 // Returns the response body of the specified GS object. Tries MAX_URI_GET_TRIES |
| 52 // times if download is unsuccessful. Client must close the response body when |
| 53 // finished with it. |
| 54 func getRespBody(res *storage.Object, client *http.Client) (io.ReadCloser, error
) { |
| 55 for i := 0; i < MAX_URI_GET_TRIES; i++ { |
| 56 glog.Infof("Fetching: %s", res.Name) |
| 57 request, err := gs.RequestForStorageURL(res.MediaLink) |
| 58 if err != nil { |
| 59 glog.Warningf("Unable to create Storage MediaURI request
: %s\n", err) |
| 60 continue |
| 61 } |
| 62 |
| 63 resp, err := client.Do(request) |
| 64 if err != nil { |
| 65 glog.Warningf("Unable to retrieve Storage MediaURI: %s",
err) |
| 66 continue |
| 67 } |
| 68 if resp.StatusCode != 200 { |
| 69 glog.Warningf("Failed to retrieve: %d %s", resp.StatusC
ode, resp.Status) |
| 70 resp.Body.Close() |
| 71 continue |
| 72 } |
| 73 return resp.Body, nil |
| 74 } |
| 75 return nil, fmt.Errorf("Failed fetching file after %d attempts", MAX_URI
_GET_TRIES) |
| 76 } |
| 77 |
| 78 // Returns the response body of the specified GS file. Client must close the |
| 79 // response body when finished with it. |
| 80 func (gs *GsUtil) GetRemoteFileContents(filePath string) (io.ReadCloser, error)
{ |
| 81 res, err := gs.service.Objects.Get(GS_BUCKET_NAME, filePath).Do() |
| 82 if err != nil { |
| 83 return nil, fmt.Errorf("Could not get %s from GS: %s", filePath,
err) |
| 84 } |
| 85 return getRespBody(res, gs.client) |
| 86 } |
| 87 |
| 88 // AreTimeStampsEqual checks whether the TIMESTAMP in the local dir matches the |
| 89 // TIMESTAMP in the remote Google Storage dir. |
| 90 func (gs *GsUtil) AreTimeStampsEqual(localDir, gsDir string) (bool, error) { |
| 91 // Get timestamp from the local directory. |
| 92 localTimestampPath := filepath.Join(localDir, TIMESTAMP_FILE_NAME) |
| 93 fileContent, err := ioutil.ReadFile(localTimestampPath) |
| 94 if err != nil { |
| 95 return false, fmt.Errorf("Could not read %s: %s", localTimestamp
Path, err) |
| 96 } |
| 97 localTimestamp := strings.Trim(string(fileContent), "\n") |
| 98 |
| 99 // Get timestamp from the Google Storage directory. |
| 100 gsTimestampPath := filepath.Join(gsDir, TIMESTAMP_FILE_NAME) |
| 101 respBody, err := gs.GetRemoteFileContents(gsTimestampPath) |
| 102 if err != nil { |
| 103 return false, err |
| 104 } |
| 105 defer respBody.Close() |
| 106 resp, err := ioutil.ReadAll(respBody) |
| 107 if err != nil { |
| 108 return false, err |
| 109 } |
| 110 gsTimestamp := strings.Trim(string(resp), "\n") |
| 111 |
| 112 // Return the comparison of the two timestamps. |
| 113 return localTimestamp == gsTimestamp, nil |
| 114 } |
| 115 |
| 116 // DownloadWorkerArtifacts downloads artifacts from Google Storage to a local di
r. |
| 117 func (gs *GsUtil) DownloadWorkerArtifacts(dirName, pagesetType string, workerNum
int) error { |
| 118 localDir := filepath.Join(StorageDir, dirName, pagesetType) |
| 119 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work
erNum)) |
| 120 |
| 121 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
| 122 // No need to download artifacts they already exist locally. |
| 123 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) |
| 124 return nil |
| 125 } |
| 126 glog.Infof("Timestamps between %s and %s are different. Downloading from
Google Storage", localDir, gsDir) |
| 127 // Empty the local dir. |
| 128 os.RemoveAll(localDir) |
| 129 // Create the local dir. |
| 130 os.MkdirAll(localDir, 0700) |
| 131 // Download from Google Storage. |
| 132 var wg sync.WaitGroup |
| 133 req := gs.service.Objects.List(GS_BUCKET_NAME).Prefix(gsDir + "/") |
| 134 for req != nil { |
| 135 resp, err := req.Do() |
| 136 if err != nil { |
| 137 return fmt.Errorf("Error occured while listing %s: %s",
gsDir, err) |
| 138 } |
| 139 for _, result := range resp.Items { |
| 140 fileName := filepath.Base(result.Name) |
| 141 |
| 142 wg.Add(1) |
| 143 go func() { |
| 144 defer wg.Done() |
| 145 respBody, err := getRespBody(result, gs.client) |
| 146 if err != nil { |
| 147 glog.Errorf("Could not fetch %s: %s", re
sult.MediaLink, err) |
| 148 return |
| 149 } |
| 150 defer respBody.Close() |
| 151 outputFile := filepath.Join(localDir, fileName) |
| 152 out, err := os.Create(outputFile) |
| 153 if err != nil { |
| 154 glog.Errorf("Unable to create file %s: %
s", outputFile, err) |
| 155 return |
| 156 } |
| 157 defer out.Close() |
| 158 if _, err = io.Copy(out, respBody); err != nil { |
| 159 glog.Error(err) |
| 160 return |
| 161 } |
| 162 glog.Infof("Downloaded gs://%s/%s to %s", GS_BUC
KET_NAME, result.Name, outputFile) |
| 163 }() |
| 164 } |
| 165 if len(resp.NextPageToken) > 0 { |
| 166 req.PageToken(resp.NextPageToken) |
| 167 } else { |
| 168 req = nil |
| 169 } |
| 170 } |
| 171 wg.Wait() |
| 172 return nil |
| 173 } |
| 174 |
| 175 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { |
| 176 var wg sync.WaitGroup |
| 177 req := gs.service.Objects.List(GS_BUCKET_NAME).Prefix(gsDir + "/") |
| 178 for req != nil { |
| 179 resp, err := req.Do() |
| 180 if err != nil { |
| 181 return fmt.Errorf("Error occured while listing %s: %s",
gsDir, err) |
| 182 } |
| 183 for _, result := range resp.Items { |
| 184 wg.Add(1) |
| 185 filePath := result.Name |
| 186 go func() { |
| 187 defer wg.Done() |
| 188 if err := gs.service.Objects.Delete(GS_BUCKET_NA
ME, filePath).Do(); err != nil { |
| 189 glog.Errorf("Could not delete %s: %s", f
ilePath, err) |
| 190 return |
| 191 } |
| 192 glog.Infof("Deleted gs://%s/%s", GS_BUCKET_NAME,
filePath) |
| 193 }() |
| 194 } |
| 195 if len(resp.NextPageToken) > 0 { |
| 196 req.PageToken(resp.NextPageToken) |
| 197 } else { |
| 198 req = nil |
| 199 } |
| 200 } |
| 201 wg.Wait() |
| 202 return nil |
| 203 } |
| 204 |
| 205 // UploadWorkerArtifacts uploads artifacts from a local dir to Google Storage. |
| 206 func (gs *GsUtil) UploadWorkerArtifacts(dirName, pagesetType string, workerNum i
nt) error { |
| 207 localDir := filepath.Join(StorageDir, dirName, pagesetType) |
| 208 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work
erNum)) |
| 209 |
| 210 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
| 211 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir
) |
| 212 return nil |
| 213 } |
| 214 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo
gle Storage", localDir, gsDir) |
| 215 |
| 216 // Empty the remote dir. |
| 217 gs.deleteRemoteDir(gsDir) |
| 218 // List the local directory. |
| 219 fileInfos, err := ioutil.ReadDir(localDir) |
| 220 if err != nil { |
| 221 return fmt.Errorf("Unable to read the local dir %s: %s", localDi
r, err) |
| 222 } |
| 223 // Upload local files into the remote directory. |
| 224 var wg sync.WaitGroup |
| 225 for _, fileInfo := range fileInfos { |
| 226 fileName := fileInfo.Name() |
| 227 wg.Add(1) |
| 228 go func() { |
| 229 defer wg.Done() |
| 230 localFile := filepath.Join(localDir, fileName) |
| 231 gsFile := filepath.Join(gsDir, fileName) |
| 232 object := &storage.Object{Name: gsFile} |
| 233 f, err := os.Open(localFile) |
| 234 if err != nil { |
| 235 glog.Errorf("Error opening %s: %s", localFile, e
rr) |
| 236 return |
| 237 } |
| 238 defer f.Close() |
| 239 res, err := gs.service.Objects.Insert(GS_BUCKET_NAME, ob
ject).Media(f).Do() |
| 240 if err != nil { |
| 241 glog.Errorf("Objects.Insert failed: %s", err) |
| 242 return |
| 243 } |
| 244 glog.Infof("Created object %s at location %s", res.Name,
res.SelfLink) |
| 245 }() |
| 246 } |
| 247 wg.Wait() |
| 248 return nil |
| 249 } |
OLD | NEW |