Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(280)

Side by Side Diff: go/src/infra/tools/cipd/storage.go

Issue 1129043003: cipd: Refactor client to make it more readable. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package cipd
6
7 import (
8 "errors"
9 "fmt"
10 "io"
11 "net/http"
12 "os"
13 "time"
14 )
15
16 const (
17 // uploadChunkSize is the max size of a single PUT HTTP request during u pload.
18 uploadChunkSize int64 = 4 * 1024 * 1024
19 // uploadMaxErrors is the number of transient errors after which upload is aborted.
20 uploadMaxErrors = 20
21 // downloadReportInterval defines frequency of "downloaded X%" log lines .
22 downloadReportInterval = 5 * time.Second
23 // downloadMaxAttempts is how many times to retry a download on errors.
24 downloadMaxAttempts = 10
25 )
26
27 // errTransientError is returned by getNextOffset in case of retryable error.
28 var errTransientError = errors.New("Transient error in getUploadedOffset")
29
30 // storageImpl implements storage via Google Storage signed URLs.
31 type storageImpl struct {
32 client *Client
33 chunkSize int64
34 }
35
36 // Google Storage resumable upload protocol.
37 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable
38
39 func (s *storageImpl) upload(url string, data io.ReadSeeker) error {
40 // Grab the total length of the file.
41 length, err := data.Seek(0, os.SEEK_END)
42 if err != nil {
43 return err
44 }
45 _, err = data.Seek(0, os.SEEK_SET)
46 if err != nil {
47 return err
48 }
49
50 // Offset of data known to be persisted in the storage or -1 if needed t o ask
51 // Google Storage for it.
52 offset := int64(0)
53 // Number of transient errors reported so far.
54 errors := 0
55
56 // Called when some new data is uploaded.
57 reportProgress := func(offset int64) {
58 if length != 0 {
59 s.client.Log.Infof("cipd: uploading - %d%%", offset*100/ length)
60 }
61 }
62
63 // Called when transient error happens.
64 reportTransientError := func() {
65 // Need to query for latest uploaded offset to resume.
66 offset = -1
nodir 2015/05/13 01:25:29 errors += 1
Vadim Sh. 2015/05/13 03:08:06 Done. Also deduplicated HTTP status code handling
67 s.client.Log.Warningf("cipd: transient upload error, retrying... ")
68 s.client.clock.sleep(2 * time.Second)
69 }
70
71 reportProgress(0)
72 for errors < uploadMaxErrors {
73 // Grab latest uploaded offset if not known.
74 if offset == -1 {
75 offset, err = s.getNextOffset(url, length)
76 if err == errTransientError {
77 reportTransientError()
78 continue
79 }
80 if err != nil {
81 return err
82 }
83 reportProgress(offset)
84 if offset == length {
85 return nil
86 }
87 s.client.Log.Warningf("cipd: resuming upload from offset %d", offset)
88 }
89
90 // Length of a chunk to upload.
91 chunk := s.chunkSize
92 if chunk > length-offset {
93 chunk = length - offset
94 }
95
96 // Upload the chunk.
97 data.Seek(offset, os.SEEK_SET)
98 r, err := http.NewRequest("PUT", url, io.LimitReader(data, chunk ))
99 if err != nil {
100 return err
101 }
102 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun k-1, length)
103 r.Header.Set("Content-Range", rangeHeader)
104 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk))
105 r.Header.Set("User-Agent", s.client.UserAgent)
106 resp, err := s.client.doAnonymousHTTPRequest(r)
107 if err != nil {
108 if isTemporaryNetError(err) {
109 reportTransientError()
110 continue
111 }
112 return err
113 }
114 resp.Body.Close()
115
116 // Partially or fully uploaded.
117 if resp.StatusCode == 308 || resp.StatusCode == 200 {
118 offset += chunk
119 reportProgress(offset)
120 if offset == length {
121 return nil
122 }
123 continue
124 }
125
126 // Fatal error.
127 if resp.StatusCode < 500 && resp.StatusCode != 408 {
128 return fmt.Errorf("Unexpected response during file uploa d: HTTP %d", resp.StatusCode)
129 }
130
131 // Transient error.
132 reportTransientError()
133 }
134
135 return ErrUploadError
136 }
137
138 // getNextOffset queries the storage for size of persisted data.
139 func (s *storageImpl) getNextOffset(url string, length int64) (offset int64, err error) {
140 r, err := http.NewRequest("PUT", url, nil)
141 if err != nil {
142 return
143 }
144 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length))
145 r.Header.Set("Content-Length", "0")
146 r.Header.Set("User-Agent", s.client.UserAgent)
147 resp, err := s.client.doAnonymousHTTPRequest(r)
148 if err != nil {
149 if isTemporaryNetError(err) {
150 err = errTransientError
151 }
152 return
153 }
154 resp.Body.Close()
155
156 if resp.StatusCode == 200 {
157 // Fully uploaded.
158 offset = length
159 } else if resp.StatusCode == 308 {
160 // Partially uploaded, extract last uploaded offset from Range h eader.
161 rangeHeader := resp.Header.Get("Range")
162 if rangeHeader != "" {
163 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset)
164 if err == nil {
165 // |offset| is an *offset* of a last uploaded by te, not the data length.
166 offset++
167 }
168 }
169 } else if resp.StatusCode < 500 && resp.StatusCode != 408 {
170 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo r uploaded offset", resp.StatusCode)
171 } else {
172 err = errTransientError
173 }
174 return
175 }
176
177 // TODO(vadimsh): Use resumable download protocol.
178
179 func (s *storageImpl) download(url string, output io.WriteSeeker) error {
180 // reportProgress print fetch progress, throttling the reports rate.
181 var prevProgress int64 = 1000 // >100%
182 var prevReportTs time.Time
183 reportProgress := func(read int64, total int64) {
184 now := s.client.clock.now()
185 progress := read * 100 / total
186 if progress < prevProgress || read == total || now.Sub(prevRepor tTs) > downloadReportInterval {
187 s.client.Log.Infof("cipd: fetching - %d%%", progress)
188 prevReportTs = now
189 prevProgress = progress
190 }
191 }
192
193 // download is a separate function to be able to use deferred close.
194 download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64) error {
195 defer src.Close()
196 s.client.Log.Infof("cipd: about to fetch %.1f Mb", float32(total Len)/1024.0/1024.0)
197 reportProgress(0, totalLen)
198 _, err := io.Copy(out, &readerWithProgress{
199 reader: src,
200 callback: func(read int64) { reportProgress(read, totalL en) },
201 })
202 if err == nil {
203 s.client.Log.Infof("cipd: fetch finished successfully")
204 }
205 return err
206 }
207
208 // Download the actual data (several attempts).
209 for attempt := 0; attempt < downloadMaxAttempts; attempt++ {
210 // Rewind output to zero offset.
211 _, err := output.Seek(0, os.SEEK_SET)
212 if err != nil {
213 return err
214 }
215
216 // Send the request.
217 s.client.Log.Infof("cipd: initiating the fetch")
218 var req *http.Request
219 var resp *http.Response
220 req, err = http.NewRequest("GET", url, nil)
221 if err != nil {
222 return err
223 }
224 req.Header.Set("User-Agent", s.client.UserAgent)
225 resp, err = s.client.doAnonymousHTTPRequest(req)
226 if err != nil {
227 if isTemporaryNetError(err) {
228 s.client.Log.Warningf("cipd: transient network e rror: %s", err)
229 continue
230 }
231 return err
232 }
233
234 // Transient error, retry.
235 if resp.StatusCode == 408 || resp.StatusCode >= 500 {
236 resp.Body.Close()
237 s.client.Log.Warningf("cipd: transient HTTP error %d whi le fetching the file", resp.StatusCode)
238 continue
239 }
240
241 // Fatal error, abort.
242 if resp.StatusCode >= 400 {
243 resp.Body.Close()
244 return fmt.Errorf("Server replied with HTTP code %d", re sp.StatusCode)
245 }
246
247 // Try to fetch (will close resp.Body when done).
248 err = download(output, resp.Body, resp.ContentLength)
249 if err != nil {
250 s.client.Log.Warningf("cipd: transient error fetching th e file: %s", err)
251 continue
252 }
253
254 // Success.
255 return nil
256 }
257
258 return ErrDownloadError
259 }
260
261 // readerWithProgress is io.Reader that calls callback whenever something is
262 // read from it.
263 type readerWithProgress struct {
264 reader io.Reader
265 total int64
266 callback func(total int64)
267 }
268
269 func (r *readerWithProgress) Read(p []byte) (int, error) {
270 n, err := r.reader.Read(p)
271 r.total += int64(n)
272 r.callback(r.total)
273 return n, err
274 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698