Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(386)

Side by Side Diff: ct/go/util/gs.go

Issue 779633003: CT Google Storage utils to download/upload artifacts for workers (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Fix unit test Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Google Storage utility that contains methods for both CT master and worker
2 // scripts.
3 package util
4
5 import (
6 "fmt"
7 "io"
8 "io/ioutil"
9 "net/http"
10 "os"
11 "path/filepath"
12 "strings"
13 "sync"
14
15 "github.com/golang/glog"
16
17 "code.google.com/p/google-api-go-client/storage/v1"
18 "skia.googlesource.com/buildbot.git/go/auth"
19 "skia.googlesource.com/buildbot.git/go/gs"
20 )
21
22 type GsUtil struct {
23 // The client used to connect to Google Storage.
24 client *http.Client
25 service *storage.Service
26 }
27
28 // NewGsUtil initializes and returns a utility for CT interations with Google
29 // Storage. If client is nil then auth.RunFlow is invoked. if client is nil then
30 // the client from GetOAuthClient is used.
31 func NewGsUtil(client *http.Client) (*GsUtil, error) {
32 if client == nil {
33 oauthClient, err := GetOAuthClient()
34 if err != nil {
35 return nil, err
36 }
37 client = oauthClient
38 }
39 service, err := storage.New(client)
40 if err != nil {
41 return nil, fmt.Errorf("Failed to create interface to Google Sto rage: %s", err)
42 }
43 return &GsUtil{client: client, service: service}, nil
44 }
45
46 func GetOAuthClient() (*http.Client, error) {
47 config := auth.OAuthConfig(GSTokenPath, auth.SCOPE_READ_WRITE)
48 return auth.RunFlow(config)
49 }
50
51 // Returns the response body of the specified GS object. Tries MAX_URI_GET_TRIES
52 // times if download is unsuccessful. Client must close the response body when
53 // finished with it.
54 func getRespBody(res *storage.Object, client *http.Client) (io.ReadCloser, error ) {
55 for i := 0; i < MAX_URI_GET_TRIES; i++ {
56 glog.Infof("Fetching: %s", res.Name)
57 request, err := gs.RequestForStorageURL(res.MediaLink)
58 if err != nil {
59 glog.Warningf("Unable to create Storage MediaURI request : %s\n", err)
60 continue
61 }
62
63 resp, err := client.Do(request)
64 if err != nil {
65 glog.Warningf("Unable to retrieve Storage MediaURI: %s", err)
66 continue
67 }
68 if resp.StatusCode != 200 {
69 glog.Warningf("Failed to retrieve: %d %s", resp.StatusC ode, resp.Status)
70 resp.Body.Close()
71 continue
72 }
73 return resp.Body, nil
74 }
75 return nil, fmt.Errorf("Failed fetching file after %d attempts", MAX_URI _GET_TRIES)
76 }
77
78 // Returns the response body of the specified GS file. Client must close the
79 // response body when finished with it.
80 func (gs *GsUtil) GetRemoteFileContents(filePath string) (io.ReadCloser, error) {
81 res, err := gs.service.Objects.Get(GS_BUCKET_NAME, filePath).Do()
82 if err != nil {
83 return nil, fmt.Errorf("Could not get %s from GS: %s", filePath, err)
84 }
85 return getRespBody(res, gs.client)
86 }
87
88 // AreTimeStampsEqual checks whether the TIMESTAMP in the local dir matches the
89 // TIMESTAMP in the remote Google Storage dir.
90 func (gs *GsUtil) AreTimeStampsEqual(localDir, gsDir string) (bool, error) {
91 // Get timestamp from the local directory.
92 localTimestampPath := filepath.Join(localDir, TIMESTAMP_FILE_NAME)
93 fileContent, err := ioutil.ReadFile(localTimestampPath)
94 if err != nil {
95 return false, fmt.Errorf("Could not read %s: %s", localTimestamp Path, err)
96 }
97 localTimestamp := strings.Trim(string(fileContent), "\n")
98
99 // Get timestamp from the Google Storage directory.
100 gsTimestampPath := filepath.Join(gsDir, TIMESTAMP_FILE_NAME)
101 respBody, err := gs.GetRemoteFileContents(gsTimestampPath)
102 if err != nil {
103 return false, err
104 }
105 defer respBody.Close()
106 resp, err := ioutil.ReadAll(respBody)
107 if err != nil {
108 return false, err
109 }
110 gsTimestamp := strings.Trim(string(resp), "\n")
111
112 // Return the comparison of the two timestamps.
113 return localTimestamp == gsTimestamp, nil
114 }
115
116 // DownloadWorkerArtifacts downloads artifacts from Google Storage to a local di r.
117 func (gs *GsUtil) DownloadWorkerArtifacts(dirName, pagesetType string, workerNum int) error {
118 localDir := filepath.Join(StorageDir, dirName, pagesetType)
119 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum))
120
121 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal {
122 // No need to download artifacts they already exist locally.
123 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir)
124 return nil
125 }
126 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir)
127 // Empty the local dir.
128 os.RemoveAll(localDir)
129 // Create the local dir.
130 os.MkdirAll(localDir, 0700)
131 // Download from Google Storage.
132 var wg sync.WaitGroup
133 req := gs.service.Objects.List(GS_BUCKET_NAME).Prefix(gsDir + "/")
134 for req != nil {
135 resp, err := req.Do()
136 if err != nil {
137 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
138 }
139 for _, result := range resp.Items {
140 fileName := filepath.Base(result.Name)
141
142 wg.Add(1)
143 go func() {
144 defer wg.Done()
145 respBody, err := getRespBody(result, gs.client)
146 if err != nil {
147 glog.Errorf("Could not fetch %s: %s", re sult.MediaLink, err)
148 return
149 }
150 defer respBody.Close()
151 outputFile := filepath.Join(localDir, fileName)
152 out, err := os.Create(outputFile)
153 if err != nil {
154 glog.Errorf("Unable to create file %s: % s", outputFile, err)
155 return
156 }
157 defer out.Close()
158 if _, err = io.Copy(out, respBody); err != nil {
159 glog.Error(err)
160 return
161 }
162 glog.Infof("Downloaded gs://%s/%s to %s", GS_BUC KET_NAME, result.Name, outputFile)
163 }()
164 }
165 if len(resp.NextPageToken) > 0 {
166 req.PageToken(resp.NextPageToken)
167 } else {
168 req = nil
169 }
170 }
171 wg.Wait()
172 return nil
173 }
174
175 func (gs *GsUtil) deleteRemoteDir(gsDir string) error {
176 var wg sync.WaitGroup
177 req := gs.service.Objects.List(GS_BUCKET_NAME).Prefix(gsDir + "/")
178 for req != nil {
179 resp, err := req.Do()
180 if err != nil {
181 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
182 }
183 for _, result := range resp.Items {
184 wg.Add(1)
185 filePath := result.Name
186 go func() {
187 defer wg.Done()
188 if err := gs.service.Objects.Delete(GS_BUCKET_NA ME, filePath).Do(); err != nil {
189 glog.Errorf("Could not delete %s: %s", f ilePath, err)
190 return
191 }
192 glog.Infof("Deleted gs://%s/%s", GS_BUCKET_NAME, filePath)
193 }()
194 }
195 if len(resp.NextPageToken) > 0 {
196 req.PageToken(resp.NextPageToken)
197 } else {
198 req = nil
199 }
200 }
201 wg.Wait()
202 return nil
203 }
204
205 // UploadWorkerArtifacts uploads artifacts from a local dir to Google Storage.
206 func (gs *GsUtil) UploadWorkerArtifacts(dirName, pagesetType string, workerNum i nt) error {
207 localDir := filepath.Join(StorageDir, dirName, pagesetType)
208 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum))
209
210 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal {
211 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir )
212 return nil
213 }
214 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir)
215
216 // Empty the remote dir.
217 gs.deleteRemoteDir(gsDir)
218 // List the local directory.
219 fileInfos, err := ioutil.ReadDir(localDir)
220 if err != nil {
221 return fmt.Errorf("Unable to read the local dir %s: %s", localDi r, err)
222 }
223 // Upload local files into the remote directory.
224 var wg sync.WaitGroup
225 for _, fileInfo := range fileInfos {
226 fileName := fileInfo.Name()
227 wg.Add(1)
228 go func() {
229 defer wg.Done()
230 localFile := filepath.Join(localDir, fileName)
231 gsFile := filepath.Join(gsDir, fileName)
232 object := &storage.Object{Name: gsFile}
233 f, err := os.Open(localFile)
234 if err != nil {
235 glog.Errorf("Error opening %s: %s", localFile, e rr)
236 return
237 }
238 defer f.Close()
239 res, err := gs.service.Objects.Insert(GS_BUCKET_NAME, ob ject).Media(f).Do()
240 if err != nil {
241 glog.Errorf("Objects.Insert failed: %s", err)
242 return
243 }
244 glog.Infof("Created object %s at location %s", res.Name, res.SelfLink)
245 }()
246 }
247 wg.Wait()
248 return nil
249 }
OLDNEW
« ct/go/util/constants.go ('K') | « ct/go/util/constants.go ('k') | ct/go/util/gs_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698