Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: go/src/infra/tools/cipd/storage.go

Issue 1129043003: cipd: Refactor client to make it more readable. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « go/src/infra/tools/cipd/remote_test.go ('k') | go/src/infra/tools/cipd/storage_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package cipd
6
7 import (
8 "errors"
9 "fmt"
10 "io"
11 "net/http"
12 "os"
13 "time"
14 )
15
16 const (
17 // uploadChunkSize is the max size of a single PUT HTTP request during u pload.
18 uploadChunkSize int64 = 4 * 1024 * 1024
19 // uploadMaxErrors is the number of transient errors after which upload is aborted.
20 uploadMaxErrors = 20
21 // downloadReportInterval defines frequency of "downloaded X%" log lines .
22 downloadReportInterval = 5 * time.Second
23 // downloadMaxAttempts is how many times to retry a download on errors.
24 downloadMaxAttempts = 10
25 )
26
27 // errTransientError is returned by getNextOffset in case of retryable error.
28 var errTransientError = errors.New("Transient error in getUploadedOffset")
29
30 // storageImpl implements storage via Google Storage signed URLs.
31 type storageImpl struct {
32 client *Client
33 chunkSize int64
34 }
35
36 // Google Storage resumable upload protocol.
37 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable
38
39 func (s *storageImpl) upload(url string, data io.ReadSeeker) error {
40 // Grab the total length of the file.
41 length, err := data.Seek(0, os.SEEK_END)
42 if err != nil {
43 return err
44 }
45 _, err = data.Seek(0, os.SEEK_SET)
46 if err != nil {
47 return err
48 }
49
50 // Offset of data known to be persisted in the storage or -1 if needed t o ask
51 // Google Storage for it.
52 offset := int64(0)
53 // Number of transient errors reported so far.
54 errors := 0
55
56 // Called when some new data is uploaded.
57 reportProgress := func(offset int64) {
58 if length != 0 {
59 s.client.Log.Infof("cipd: uploading - %d%%", offset*100/ length)
60 }
61 }
62
63 // Called when transient error happens.
64 reportTransientError := func() {
65 // Need to query for latest uploaded offset to resume.
66 errors++
67 offset = -1
68 if errors < uploadMaxErrors {
69 s.client.Log.Warningf("cipd: transient upload error, ret rying...")
70 s.client.clock.sleep(2 * time.Second)
71 }
72 }
73
74 reportProgress(0)
75 for errors < uploadMaxErrors {
76 // Grab latest uploaded offset if not known.
77 if offset == -1 {
78 offset, err = s.getNextOffset(url, length)
79 if err == errTransientError {
80 reportTransientError()
81 continue
82 }
83 if err != nil {
84 return err
85 }
86 reportProgress(offset)
87 if offset == length {
88 return nil
89 }
90 s.client.Log.Warningf("cipd: resuming upload from offset %d", offset)
91 }
92
93 // Length of a chunk to upload.
94 chunk := s.chunkSize
95 if chunk > length-offset {
96 chunk = length - offset
97 }
98
99 // Upload the chunk.
100 data.Seek(offset, os.SEEK_SET)
101 r, err := http.NewRequest("PUT", url, io.LimitReader(data, chunk ))
102 if err != nil {
103 return err
104 }
105 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun k-1, length)
106 r.Header.Set("Content-Range", rangeHeader)
107 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk))
108 r.Header.Set("User-Agent", s.client.UserAgent)
109 resp, err := s.client.doAnonymousHTTPRequest(r)
110 if err != nil {
111 if isTemporaryNetError(err) {
112 reportTransientError()
113 continue
114 }
115 return err
116 }
117 resp.Body.Close()
118
119 // Partially or fully uploaded.
120 if resp.StatusCode == 308 || resp.StatusCode == 200 {
121 offset += chunk
122 reportProgress(offset)
123 if offset == length {
124 return nil
125 }
126 continue
127 }
128
129 // Transient error.
130 if isTemporaryHTTPError(resp.StatusCode) {
131 reportTransientError()
132 continue
133 }
134
135 // Fatal error.
136 return fmt.Errorf("Unexpected response during file upload: HTTP %d", resp.StatusCode)
137 }
138
139 return ErrUploadError
140 }
141
142 // getNextOffset queries the storage for size of persisted data.
143 func (s *storageImpl) getNextOffset(url string, length int64) (offset int64, err error) {
144 r, err := http.NewRequest("PUT", url, nil)
145 if err != nil {
146 return
147 }
148 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length))
149 r.Header.Set("Content-Length", "0")
150 r.Header.Set("User-Agent", s.client.UserAgent)
151 resp, err := s.client.doAnonymousHTTPRequest(r)
152 if err != nil {
153 if isTemporaryNetError(err) {
154 err = errTransientError
155 }
156 return
157 }
158 resp.Body.Close()
159
160 if resp.StatusCode == 200 {
161 // Fully uploaded.
162 offset = length
163 } else if resp.StatusCode == 308 {
164 // Partially uploaded, extract last uploaded offset from Range h eader.
165 rangeHeader := resp.Header.Get("Range")
166 if rangeHeader != "" {
167 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset)
168 if err == nil {
169 // |offset| is an *offset* of a last uploaded by te, not the data length.
170 offset++
171 }
172 }
173 } else if isTemporaryHTTPError(resp.StatusCode) {
174 err = errTransientError
175 } else {
176 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo r uploaded offset", resp.StatusCode)
177 }
178 return
179 }
180
181 // TODO(vadimsh): Use resumable download protocol.
182
183 func (s *storageImpl) download(url string, output io.WriteSeeker) error {
184 // reportProgress print fetch progress, throttling the reports rate.
185 var prevProgress int64 = 1000 // >100%
186 var prevReportTs time.Time
187 reportProgress := func(read int64, total int64) {
188 now := s.client.clock.now()
189 progress := read * 100 / total
190 if progress < prevProgress || read == total || now.Sub(prevRepor tTs) > downloadReportInterval {
191 s.client.Log.Infof("cipd: fetching - %d%%", progress)
192 prevReportTs = now
193 prevProgress = progress
194 }
195 }
196
197 // download is a separate function to be able to use deferred close.
198 download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64) error {
199 defer src.Close()
200 s.client.Log.Infof("cipd: about to fetch %.1f Mb", float32(total Len)/1024.0/1024.0)
201 reportProgress(0, totalLen)
202 _, err := io.Copy(out, &readerWithProgress{
203 reader: src,
204 callback: func(read int64) { reportProgress(read, totalL en) },
205 })
206 if err == nil {
207 s.client.Log.Infof("cipd: fetch finished successfully")
208 }
209 return err
210 }
211
212 // Download the actual data (several attempts).
213 for attempt := 0; attempt < downloadMaxAttempts; attempt++ {
214 // Rewind output to zero offset.
215 _, err := output.Seek(0, os.SEEK_SET)
216 if err != nil {
217 return err
218 }
219
220 // Send the request.
221 s.client.Log.Infof("cipd: initiating the fetch")
222 var req *http.Request
223 var resp *http.Response
224 req, err = http.NewRequest("GET", url, nil)
225 if err != nil {
226 return err
227 }
228 req.Header.Set("User-Agent", s.client.UserAgent)
229 resp, err = s.client.doAnonymousHTTPRequest(req)
230 if err != nil {
231 if isTemporaryNetError(err) {
232 s.client.Log.Warningf("cipd: transient network e rror: %s", err)
233 continue
234 }
235 return err
236 }
237
238 // Transient error, retry.
239 if isTemporaryHTTPError(resp.StatusCode) {
240 resp.Body.Close()
241 s.client.Log.Warningf("cipd: transient HTTP error %d whi le fetching the file", resp.StatusCode)
242 continue
243 }
244
245 // Fatal error, abort.
246 if resp.StatusCode >= 400 {
247 resp.Body.Close()
248 return fmt.Errorf("Server replied with HTTP code %d", re sp.StatusCode)
249 }
250
251 // Try to fetch (will close resp.Body when done).
252 err = download(output, resp.Body, resp.ContentLength)
253 if err != nil {
254 s.client.Log.Warningf("cipd: transient error fetching th e file: %s", err)
255 continue
256 }
257
258 // Success.
259 return nil
260 }
261
262 return ErrDownloadError
263 }
264
265 // readerWithProgress is io.Reader that calls callback whenever something is
266 // read from it.
267 type readerWithProgress struct {
268 reader io.Reader
269 total int64
270 callback func(total int64)
271 }
272
273 func (r *readerWithProgress) Read(p []byte) (int, error) {
274 n, err := r.reader.Read(p)
275 r.total += int64(n)
276 r.callback(r.total)
277 return n, err
278 }
OLDNEW
« no previous file with comments | « go/src/infra/tools/cipd/remote_test.go ('k') | go/src/infra/tools/cipd/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698