Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(694)

Side by Side Diff: go/src/infra/tools/cipd/uploader.go

Issue 1129043003: cipd: Refactor client to make it more readable. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « go/src/infra/tools/cipd/storage_test.go ('k') | go/src/infra/tools/cipd/uploader_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package cipd
6
7 import (
8 "errors"
9 "fmt"
10 "io"
11 "net/http"
12 "os"
13 "time"
14
15 "infra/libs/logging"
16 )
17
18 var (
19 // ErrFinalizationTimeout is returned if CAS service can not finalize up load fast enough.
20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi ce to finalize the upload")
21 // ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time.
22 ErrAttachTagsTimeout = errors.New("Timeout while attaching tags")
23 )
24
25 // UploadOptions contains upload related parameters shared by UploadToCAS and
26 // RegisterInstance functions.
27 type UploadOptions struct {
28 // ServiceURL is root URL of the backend service, or "" to use default s ervice.
29 ServiceURL string
30 // FinalizationTimeout is how long to wait for CAS service to finalize t he upload, default is 1 min.
31 FinalizationTimeout time.Duration
32 // Client is http.Client to use to make requests, default is http.Defaul tClient.
33 Client *http.Client
34 // Log is a logger to use for logs, default is logging.DefaultLogger.
35 Log logging.Logger
36 }
37
38 // UploadToCASOptions contains parameters for UploadToCAS function.
39 type UploadToCASOptions struct {
40 UploadOptions
41
42 // SHA1 is a SHA1 hash of data to upload, usually package's InstanceID() .
43 SHA1 string
44 // Data provides actual data to upload. It is seekable to support resuma ble uploads.
45 Data io.ReadSeeker
46 // UploadSessionID identified existing upload session. Empty string to s tart a new one.
47 UploadSessionID string
48 // UploadURL is where to upload the file to. Must be set if UploadSessio nID is not empty.
49 UploadURL string
50
51 // remote is an instance of remoteService to use (if set).
52 remote *remoteService
53 }
54
55 // UploadToCAS uploads package data blob to Content Addressed Store if it is not
56 // there already. The data is addressed by SHA1 hash (also known as package's
57 // InstanceID). It can be used as a standalone function (if UploadSessionID
58 // is "") or as a part of more high level upload process (in that case upload
59 // session can be opened elsewhere and its properties passed here via
60 // UploadSessionID and UploadURL). Returns nil on successful upload.
61 func UploadToCAS(options UploadToCASOptions) error {
62 // Fill in default options.
63 if options.ServiceURL == "" {
64 options.ServiceURL = DefaultServiceURL()
65 }
66 if options.FinalizationTimeout == 0 {
67 options.FinalizationTimeout = 60 * time.Second
68 }
69 if options.Client == nil {
70 options.Client = http.DefaultClient
71 }
72 if options.Log == nil {
73 options.Log = logging.DefaultLogger
74 }
75 if options.remote == nil {
76 options.remote = newRemoteService(options.Client, options.Servic eURL, options.Log)
77 }
78 log := options.Log
79 remote := options.remote
80
81 // Open new upload session if existing is not provided.
82 var session *uploadSession
83 var err error
84 if options.UploadSessionID == "" {
85 log.Infof("cipd: uploading %s: initiating", options.SHA1)
86 session, err = remote.initiateUpload(options.SHA1)
87 if err != nil {
88 log.Warningf("cipd: can't upload %s - %s", options.SHA1, err)
89 return err
90 }
91 if session == nil {
92 log.Infof("cipd: %s is already uploaded", options.SHA1)
93 return nil
94 }
95 } else {
96 if options.UploadURL == "" {
97 return errors.New("UploadURL must be set if UploadSessio nID is used")
98 }
99 session = &uploadSession{
100 ID: options.UploadSessionID,
101 URL: options.UploadURL,
102 }
103 }
104
105 // Upload the file to CAS storage.
106 err = resumableUpload(session.URL, 8*1024*1024, options)
107 if err != nil {
108 return err
109 }
110
111 // Finalize the upload, wait until server verifies and publishes the fil e.
112 started := clock.Now()
113 delay := time.Second
114 for {
115 published, err := remote.finalizeUpload(session.ID)
116 if published {
117 log.Infof("cipd: successfully uploaded %s", options.SHA1 )
118 return nil
119 }
120 if err != nil {
121 log.Warningf("cipd: upload of %s failed: %s", options.SH A1, err)
122 return err
123 }
124 if clock.Now().Sub(started) > options.FinalizationTimeout {
125 log.Warningf("cipd: upload of %s failed: timeout", optio ns.SHA1)
126 return ErrFinalizationTimeout
127 }
128 log.Infof("cipd: uploading %s: verifying", options.SHA1)
129 clock.Sleep(delay)
130 if delay < 4*time.Second {
131 delay += 500 * time.Millisecond
132 }
133 }
134 }
135
136 // RegisterInstanceOptions contains parameters for RegisterInstance function.
137 type RegisterInstanceOptions struct {
138 UploadOptions
139
140 // PackageInstance is a package instance to register.
141 PackageInstance PackageInstance
142 // Tags is a list of tags to attach to an instance. Will be attached eve n if
143 // the instance already existed before.
144 Tags []string
145 }
146
147 // RegisterInstance makes the package instance available for clients by
148 // uploading it to the storage and registering it in the package repository.
149 func RegisterInstance(options RegisterInstanceOptions) error {
150 // Fill in default options.
151 if options.ServiceURL == "" {
152 options.ServiceURL = DefaultServiceURL()
153 }
154 if options.Client == nil {
155 options.Client = http.DefaultClient
156 }
157 if options.Log == nil {
158 options.Log = logging.DefaultLogger
159 }
160 log := options.Log
161 inst := options.PackageInstance
162 remote := newRemoteService(options.Client, options.ServiceURL, log)
163
164 // Attempt to register.
165 result, err := remote.registerInstance(inst.PackageName(), inst.Instance ID())
166 if err != nil {
167 return err
168 }
169
170 // Asked to upload the package file to CAS first?
171 if result.UploadSession != nil {
172 err = UploadToCAS(UploadToCASOptions{
173 UploadOptions: options.UploadOptions,
174 SHA1: inst.InstanceID(),
175 Data: inst.DataReader(),
176 UploadSessionID: result.UploadSession.ID,
177 UploadURL: result.UploadSession.URL,
178 remote: remote,
179 })
180 if err != nil {
181 return err
182 }
183 // Try again, now that file is uploaded.
184 result, err = remote.registerInstance(inst.PackageName(), inst.I nstanceID())
185 if err != nil {
186 return err
187 }
188 if result.UploadSession != nil {
189 return errors.New("Package file is uploaded, but servers asks us to upload it again")
190 }
191 }
192
193 if result.AlreadyRegistered {
194 log.Infof(
195 "cipd: instance %s:%s is already registered by %s on %s" ,
196 inst.PackageName(), inst.InstanceID(),
197 result.Info.RegisteredBy, result.Info.RegisteredTs)
198 } else {
199 log.Infof("cipd: instance %s:%s was successfully registered", in st.PackageName(), inst.InstanceID())
200 }
201
202 return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID() , options.Tags, log)
203 }
204
205 ////////////////////////////////////////////////////////////////////////////////
206 // Google Storage resumable upload protocol.
207 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable
208
209 // errTransientError is returned by getNextOffset in case of retryable error.
210 var errTransientError = errors.New("Transient error in getUploadedOffset")
211
212 // resumableUpload is mocked in tests.
213 var resumableUpload = func(uploadURL string, chunkSize int64, opts UploadToCASOp tions) error {
214 // Grab the total length of the file.
215 length, err := opts.Data.Seek(0, os.SEEK_END)
216 if err != nil {
217 return err
218 }
219 _, err = opts.Data.Seek(0, os.SEEK_SET)
220 if err != nil {
221 return err
222 }
223
224 // Called when some new data is uploaded.
225 reportProgress := func(offset int64) {
226 if length != 0 {
227 opts.Log.Infof("cipd: uploading %s: %d%%", opts.SHA1, of fset*100/length)
228 }
229 }
230
231 // Called when transient error happens.
232 reportTransientError := func() {
233 opts.Log.Warningf("cipd: transient upload error, retrying...")
234 clock.Sleep(2 * time.Second)
235 }
236
237 var offset int64
238 reportProgress(0)
239 for {
240 // Grab latest uploaded offset if not known.
241 if offset == -1 {
242 offset, err = getNextOffset(uploadURL, length, opts.Clie nt)
243 if err == errTransientError {
244 offset = -1
245 reportTransientError()
246 continue
247 }
248 if err != nil {
249 return err
250 }
251 reportProgress(offset)
252 if offset == length {
253 return nil
254 }
255 opts.Log.Warningf("cipd: resuming upload from offset %d" , offset)
256 }
257
258 // Length of a chunk to upload.
259 var chunk int64 = chunkSize
260 if chunk > length-offset {
261 chunk = length - offset
262 }
263
264 // Upload the chunk.
265 opts.Data.Seek(offset, os.SEEK_SET)
266 r, err := http.NewRequest("PUT", uploadURL, io.LimitReader(opts. Data, chunk))
267 if err != nil {
268 return err
269 }
270 rangeHeader := fmt.Sprintf("bytes %d-%d/%d", offset, offset+chun k-1, length)
271 r.Header.Set("Content-Range", rangeHeader)
272 r.Header.Set("Content-Length", fmt.Sprintf("%d", chunk))
273 r.Header.Set("User-Agent", userAgent())
274 resp, err := opts.Client.Do(r)
275 if err != nil {
276 return err
277 }
278 resp.Body.Close()
279
280 // Partially or fully uploaded.
281 if resp.StatusCode == 308 || resp.StatusCode == 200 {
282 offset += chunk
283 reportProgress(offset)
284 if offset == length {
285 return nil
286 }
287 } else if resp.StatusCode < 500 && resp.StatusCode != 408 {
288 return fmt.Errorf("Unexpected response during file uploa d: HTTP %d", resp.StatusCode)
289 } else {
290 // Transient error. Need to query for latest uploaded of fset to resume.
291 offset = -1
292 reportTransientError()
293 }
294 }
295 }
296
297 // getNextOffset queries the storage for size of persisted data.
298 func getNextOffset(uploadURL string, length int64, client *http.Client) (offset int64, err error) {
299 r, err := http.NewRequest("PUT", uploadURL, nil)
300 if err != nil {
301 return
302 }
303 r.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", length))
304 r.Header.Set("Content-Length", "0")
305 r.Header.Set("User-Agent", userAgent())
306 resp, err := client.Do(r)
307 if err != nil {
308 return
309 }
310 resp.Body.Close()
311
312 if resp.StatusCode == 200 {
313 // Fully uploaded.
314 offset = length
315 } else if resp.StatusCode == 308 {
316 // Partially uploaded, extract last uploaded offset from Range h eader.
317 rangeHeader := resp.Header.Get("Range")
318 if rangeHeader != "" {
319 _, err = fmt.Sscanf(rangeHeader, "bytes=0-%d", &offset)
320 if err == nil {
321 // |offset| is an *offset* of a last uploaded by te, not the data length.
322 offset++
323 }
324 }
325 } else if resp.StatusCode < 500 && resp.StatusCode != 408 {
326 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo r uploaded offset", resp.StatusCode)
327 } else {
328 err = errTransientError
329 }
330 return
331 }
332
333 ////////////////////////////////////////////////////////////////////////////////
334 // Tags related functions.
335
336 // attachTagsWhenReady attaches tags to an instance retrying when receiving
337 // PROCESSING_NOT_FINISHED_YET errors.
338 func attachTagsWhenReady(remote *remoteService, packageName, instanceID string, tags []string, log logging.Logger) error {
339 if len(tags) == 0 {
340 return nil
341 }
342 for _, tag := range tags {
343 log.Infof("cipd: attaching tag %s", tag)
344 }
345 deadline := clock.Now().Add(60 * time.Second)
346 for clock.Now().Before(deadline) {
347 err := remote.attachTags(packageName, instanceID, tags)
348 if err == nil {
349 log.Infof("cipd: all tags attached")
350 return nil
351 }
352 if _, ok := err.(*pendingProcessingError); ok {
353 log.Warningf("cipd: package instance is not ready yet - %s", err)
354 clock.Sleep(5 * time.Second)
355 } else {
356 log.Errorf("cipd: failed to attach tags - %s", err)
357 return err
358 }
359 }
360 log.Errorf("cipd: failed to attach tags - deadline exceeded")
361 return ErrAttachTagsTimeout
362 }
OLDNEW
« no previous file with comments | « go/src/infra/tools/cipd/storage_test.go ('k') | go/src/infra/tools/cipd/uploader_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698