Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(345)

Side by Side Diff: go/src/infra/tools/cipd/fetcher.go

Issue 1129043003: cipd: Refactor client to make it more readable. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « go/src/infra/tools/cipd/ensure_test.go ('k') | go/src/infra/tools/cipd/fetcher_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package cipd
6
7 import (
8 "fmt"
9 "io"
10 "io/ioutil"
11 "net/http"
12 "os"
13 "path/filepath"
14 "time"
15
16 "infra/libs/logging"
17 )
18
19 // FetchInstanceOptions contains parameters for FetchInstance function.
20 type FetchInstanceOptions struct {
21 // ServiceURL is root URL of the backend service, or "" to use default s ervice.
22 ServiceURL string
23 // Client is http.Client to use to make requests, default is http.Defaul tClient.
24 Client *http.Client
25 // Log is a logger to use for logs, default is logging.DefaultLogger.
26 Log logging.Logger
27
28 // PackageName is a name of the package to fetch.
29 PackageName string
30 // InstanceID identifies an instance of the package to fetch.
31 InstanceID string
32 // Output is where to write the fetched data to. Must be nil when used w ith FetchAndDeployInstance.
33 Output io.WriteSeeker
34 }
35
36 // FetchInstance downloads package instance file from the repository.
37 func FetchInstance(options FetchInstanceOptions) (err error) {
38 // Fill in default options.
39 if options.ServiceURL == "" {
40 options.ServiceURL = DefaultServiceURL()
41 }
42 if options.Client == nil {
43 options.Client = http.DefaultClient
44 }
45 if options.Log == nil {
46 options.Log = logging.DefaultLogger
47 }
48 log := options.Log
49 remote := newRemoteService(options.Client, options.ServiceURL, log)
50
51 // Logs the error before returning it.
52 defer func() {
53 if err != nil {
54 log.Errorf("cipd: failed to fetch %s (%s)", options.Pack ageName, err)
55 }
56 }()
57
58 // Grab fetch URL.
59 log.Infof("cipd: resolving %s:%s", options.PackageName, options.Instance ID)
60 fetchInfo, err := remote.fetchInstance(options.PackageName, options.Inst anceID)
61 if err != nil {
62 return
63 }
64
65 // reportProgress print fetch progress, throttling the reports rate.
66 var prevProgress int64 = 1000
67 var prevReportTs time.Time
68 reportProgress := func(read int64, total int64) {
69 now := time.Now()
70 progress := read * 100 / total
71 if progress < prevProgress || read == total || now.Sub(prevRepor tTs) > 5*time.Second {
72 log.Infof("cipd: fetching %s: %d%%", options.InstanceID, progress)
73 prevReportTs = now
74 prevProgress = progress
75 }
76 }
77
78 // download is a separate function to be able to use deferred close.
79 download := func(out io.WriteSeeker, src io.ReadCloser, totalLen int64) error {
80 defer src.Close()
81 log.Infof("cipd: fetching %s (%.1f Mb)", options.InstanceID, flo at32(totalLen)/1024.0/1024.0)
82 reportProgress(0, totalLen)
83 _, err := io.Copy(out, &readerWithProgress{
84 reader: src,
85 callback: func(read int64) { reportProgress(read, totalL en) },
86 })
87 if err == nil {
88 log.Infof("cipd: successfully fetched %s", options.Insta nceID)
89 }
90 return err
91 }
92
93 // Download the actual data (several attempts).
94 maxAttempts := 10
95 for attempt := 0; attempt < maxAttempts; attempt++ {
96 // Rewind output to zero offset.
97 _, err = options.Output.Seek(0, os.SEEK_SET)
98 if err != nil {
99 return
100 }
101
102 // Send the request.
103 log.Infof("cipd: initiating the fetch")
104 var req *http.Request
105 var resp *http.Response
106 req, err = http.NewRequest("GET", fetchInfo.FetchURL, nil)
107 if err != nil {
108 return
109 }
110 req.Header.Set("User-Agent", userAgent())
111 resp, err = options.Client.Do(req)
112 if err != nil {
113 return
114 }
115
116 // Transient error, retry.
117 if resp.StatusCode == 408 || resp.StatusCode >= 500 {
118 resp.Body.Close()
119 log.Warningf("cipd: transient HTTP error %d while fetchi ng the file", resp.StatusCode)
120 continue
121 }
122
123 // Fatal error, abort.
124 if resp.StatusCode >= 400 {
125 resp.Body.Close()
126 return fmt.Errorf("Server replied with HTTP code %d", re sp.StatusCode)
127 }
128
129 // Try to fetch.
130 err = download(options.Output, resp.Body, resp.ContentLength)
131 if err != nil {
132 log.Warningf("cipd: transient error fetching the file: % s", err)
133 continue
134 }
135
136 // Success.
137 err = nil
138 return
139 }
140
141 err = fmt.Errorf("All %d fetch attempts failed", maxAttempts)
142 return
143 }
144
145 // FetchAndDeployInstance fetches the package instance and deploys it into
146 // a site root. It doesn't check whether the instance is already deployed.
147 // options.Output field is not used and must be set to nil.
148 func FetchAndDeployInstance(root string, options FetchInstanceOptions) error {
149 // Be paranoid.
150 err := ValidatePackageName(options.PackageName)
151 if err != nil {
152 return err
153 }
154 err = ValidateInstanceID(options.InstanceID)
155 if err != nil {
156 return err
157 }
158
159 // This field is not supported.
160 if options.Output != nil {
161 return fmt.Errorf("Passed non-nil Output to FetchAndDeployInstan ce")
162 }
163
164 // Use temp file for storing package file. Delete it when done.
165 var instance PackageInstance
166 tempPath := filepath.Join(root, siteServiceDir, "tmp")
167 err = os.MkdirAll(tempPath, 0777)
168 if err != nil {
169 return err
170 }
171 f, err := ioutil.TempFile(tempPath, options.InstanceID)
172 if err != nil {
173 return err
174 }
175 defer func() {
176 // Instance takes ownership of the file, no need to close it sep arately.
177 if instance == nil {
178 f.Close()
179 }
180 os.Remove(f.Name())
181 }()
182
183 // Fetch the package data to the provided storage.
184 options.Output = f
185 err = FetchInstance(options)
186 if err != nil {
187 return err
188 }
189
190 // Open the instance, verify the instance ID.
191 instance, err = OpenInstance(f, options.InstanceID)
192 if err != nil {
193 return err
194 }
195 defer instance.Close()
196
197 // Deploy it. 'defer' will take care of removing the temp file if needed .
198 _, err = DeployInstance(root, instance)
199 return err
200 }
201
202 ////////////////////////////////////////////////////////////////////////////////
203
204 // readerWithProgress is io.Reader that calls callback whenever something is
205 // read from it.
206 type readerWithProgress struct {
207 reader io.Reader
208 total int64
209 callback func(total int64)
210 }
211
212 func (r *readerWithProgress) Read(p []byte) (int, error) {
213 n, err := r.reader.Read(p)
214 r.total += int64(n)
215 r.callback(r.total)
216 return n, err
217 }
OLDNEW
« no previous file with comments | « go/src/infra/tools/cipd/ensure_test.go ('k') | go/src/infra/tools/cipd/fetcher_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698