OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package cipd | 5 package cipd |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "fmt" | 9 "fmt" |
10 "io" | 10 "io" |
11 "net/http" | 11 "net/http" |
12 "os" | 12 "os" |
13 "time" | 13 "time" |
14 | 14 |
15 "infra/libs/logging" | 15 "infra/libs/logging" |
16 ) | 16 ) |
17 | 17 |
18 var ( | 18 var ( |
19 // ErrFinalizationTimeout is returned if CAS service can not finalize up load fast enough. | 19 // ErrFinalizationTimeout is returned if CAS service can not finalize up load fast enough. |
20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi ce to finalize the upload") | 20 ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS servi ce to finalize the upload") |
21 // ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time. | |
22 ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") | |
21 ) | 23 ) |
22 | 24 |
23 // UploadOptions contains upload related parameters shared by UploadToCAS and | 25 // UploadOptions contains upload related parameters shared by UploadToCAS and |
24 // RegisterInstance functions. | 26 // RegisterInstance functions. |
25 type UploadOptions struct { | 27 type UploadOptions struct { |
26 // ServiceURL is root URL of the backend service, or "" to use default s ervice. | 28 // ServiceURL is root URL of the backend service, or "" to use default s ervice. |
27 ServiceURL string | 29 ServiceURL string |
28 // FinalizationTimeout is how long to wait for CAS service to finalize t he upload, default is 1 min. | 30 // FinalizationTimeout is how long to wait for CAS service to finalize t he upload, default is 1 min. |
29 FinalizationTimeout time.Duration | 31 FinalizationTimeout time.Duration |
30 // Client is http.Client to use to make requests, default is http.Defaul tClient. | 32 // Client is http.Client to use to make requests, default is http.Defaul tClient. |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
128 if delay < 4*time.Second { | 130 if delay < 4*time.Second { |
129 delay += 500 * time.Millisecond | 131 delay += 500 * time.Millisecond |
130 } | 132 } |
131 } | 133 } |
132 } | 134 } |
133 | 135 |
134 // RegisterInstanceOptions contains parameters for RegisterInstance function. | 136 // RegisterInstanceOptions contains parameters for RegisterInstance function. |
135 type RegisterInstanceOptions struct { | 137 type RegisterInstanceOptions struct { |
136 UploadOptions | 138 UploadOptions |
137 | 139 |
138 » // PackageInstance is a package to upload. | 140 » // PackageInstance is a package instance to register. |
139 PackageInstance PackageInstance | 141 PackageInstance PackageInstance |
142 // Tags is a list of tags to attach to an instance. Will be attached eve n if | |
143 // instance already existed before. | |
nodir
2015/05/07 04:19:02
english: the instance
Vadim Sh.
2015/05/07 05:07:58
Done.
| |
144 Tags []string | |
140 } | 145 } |
141 | 146 |
142 // RegisterInstance makes the package instance available for clients by | 147 // RegisterInstance makes the package instance available for clients by |
143 // uploading it to the storage and registering it in the package repository. | 148 // uploading it to the storage and registering it in the package repository. |
144 func RegisterInstance(options RegisterInstanceOptions) error { | 149 func RegisterInstance(options RegisterInstanceOptions) error { |
145 // Fill in default options. | 150 // Fill in default options. |
146 if options.ServiceURL == "" { | 151 if options.ServiceURL == "" { |
147 options.ServiceURL = DefaultServiceURL() | 152 options.ServiceURL = DefaultServiceURL() |
148 } | 153 } |
149 if options.Client == nil { | 154 if options.Client == nil { |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
187 | 192 |
188 if result.AlreadyRegistered { | 193 if result.AlreadyRegistered { |
189 log.Infof( | 194 log.Infof( |
190 "cipd: instance %s:%s is already registered by %s on %s" , | 195 "cipd: instance %s:%s is already registered by %s on %s" , |
191 inst.PackageName(), inst.InstanceID(), | 196 inst.PackageName(), inst.InstanceID(), |
192 result.Info.RegisteredBy, result.Info.RegisteredTs) | 197 result.Info.RegisteredBy, result.Info.RegisteredTs) |
193 } else { | 198 } else { |
194 log.Infof("cipd: instance %s:%s was successfully registered", in st.PackageName(), inst.InstanceID()) | 199 log.Infof("cipd: instance %s:%s was successfully registered", in st.PackageName(), inst.InstanceID()) |
195 } | 200 } |
196 | 201 |
197 » return nil | 202 » return attachTagsWhenReady(remote, inst.PackageName(), inst.InstanceID() , options.Tags, log) |
198 } | 203 } |
199 | 204 |
200 //////////////////////////////////////////////////////////////////////////////// | 205 //////////////////////////////////////////////////////////////////////////////// |
201 // Google Storage resumable upload protocol. | 206 // Google Storage resumable upload protocol. |
202 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable | 207 // See https://cloud.google.com/storage/docs/concepts-techniques#resumable |
203 | 208 |
204 // errTransientError is returned by getNextOffset in case of retryable error. | 209 // errTransientError is returned by getNextOffset in case of retryable error. |
205 var errTransientError = errors.New("Transient error in getUploadedOffset") | 210 var errTransientError = errors.New("Transient error in getUploadedOffset") |
206 | 211 |
207 // resumableUpload is mocked in tests. | 212 // resumableUpload is mocked in tests. |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
317 offset++ | 322 offset++ |
318 } | 323 } |
319 } | 324 } |
320 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { | 325 } else if resp.StatusCode < 500 && resp.StatusCode != 408 { |
321 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo r uploaded offset", resp.StatusCode) | 326 err = fmt.Errorf("Unexpected response (HTTP %d) when querying fo r uploaded offset", resp.StatusCode) |
322 } else { | 327 } else { |
323 err = errTransientError | 328 err = errTransientError |
324 } | 329 } |
325 return | 330 return |
326 } | 331 } |
332 | |
333 //////////////////////////////////////////////////////////////////////////////// | |
334 // Tags related functions. | |
335 | |
336 // attachTagsWhenReady attaches tags to an instance retrying when receiving | |
337 // PROCESSING_NOT_FINISHED_YET errors. | |
338 func attachTagsWhenReady(remote *remoteService, packageName, instanceID string, tags []string, log logging.Logger) error { | |
339 if len(tags) == 0 { | |
340 return nil | |
341 } | |
342 for _, tag := range tags { | |
343 log.Infof("cipd: attaching tag %s", tag) | |
344 } | |
345 deadline := clock.Now().Add(60 * time.Second) | |
346 for clock.Now().Before(deadline) { | |
347 err := remote.attachTags(packageName, instanceID, tags) | |
348 if err == nil { | |
349 log.Infof("cipd: all tags attached") | |
350 return nil | |
351 } | |
352 if _, ok := err.(*pendingProcessingError); ok { | |
353 log.Warningf("cipd: package instance is not ready yet - %s", err) | |
354 clock.Sleep(5 * time.Second) | |
355 } else { | |
356 log.Errorf("cipd: failed to attach tags - %s", err) | |
357 return err | |
358 } | |
359 } | |
360 log.Errorf("cipd: failed to attach tags - deadline exceeded") | |
361 return ErrAttachTagsTimeout | |
362 } | |
OLD | NEW |