Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Side by Side Diff: client/isolatedclient/isolatedclient.go

Issue 1846263002: Isolate: Use generators instead of seekers (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Tweaks from comments. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/isolate/isolate.go ('k') | client/isolatedclient/isolatedclient_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package isolatedclient 5 package isolatedclient
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "io" 10 "io"
11 "io/ioutil" 11 "io/ioutil"
12 "log" 12 "log"
13 "net/http" 13 "net/http"
14 "strings" 14 "strings"
15 "sync"
16 15
17 "github.com/luci/luci-go/client/internal/lhttp" 16 "github.com/luci/luci-go/client/internal/lhttp"
18 "github.com/luci/luci-go/client/internal/retry" 17 "github.com/luci/luci-go/client/internal/retry"
19 "github.com/luci/luci-go/client/internal/tracer" 18 "github.com/luci/luci-go/client/internal/tracer"
20 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" 19 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1"
21 "github.com/luci/luci-go/common/isolated" 20 "github.com/luci/luci-go/common/isolated"
22 ) 21 )
23 22
23 // compressedBufSize is the size of the read buffer that will be used to pull
24 // data from a source into the compressor.
25 const compressedBufSize = 4096
26
27 // Source is a generator method to return source data. A generated Source must
28 // be Closed before the generator is called again.
29 type Source func() (io.ReadCloser, error)
30
31 // NewBytesSource returns a Source implementation that reads from the supplied
32 // byte slice.
33 func NewBytesSource(d []byte) Source {
34 return func() (io.ReadCloser, error) {
35 return ioutil.NopCloser(bytes.NewReader(d)), nil
36 }
37 }
38
24 // IsolateServer is the low-level client interface to interact with an Isolate 39 // IsolateServer is the low-level client interface to interact with an Isolate
25 // server. 40 // server.
26 type IsolateServer interface { 41 type IsolateServer interface {
27 ServerCapabilities() (*isolateservice.HandlersEndpointsV1ServerDetails, error) 42 ServerCapabilities() (*isolateservice.HandlersEndpointsV1ServerDetails, error)
28 // Contains looks up cache presence on the server of multiple items. 43 // Contains looks up cache presence on the server of multiple items.
29 // 44 //
30 // The returned list is in the same order as 'items', with entries nil f or 45 // The returned list is in the same order as 'items', with entries nil f or
31 // items that were present. 46 // items that were present.
32 Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushStat e, error) 47 Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushStat e, error)
33 » Push(state *PushState, src io.ReadSeeker) error 48 » Push(state *PushState, src Source) error
34 } 49 }
35 50
36 // PushState is per-item state passed from IsolateServer.Contains() to 51 // PushState is per-item state passed from IsolateServer.Contains() to
37 // IsolateServer.Push(). 52 // IsolateServer.Push().
38 // 53 //
39 // Its content is implementation specific. 54 // Its content is implementation specific.
40 type PushState struct { 55 type PushState struct {
41 status isolateservice.HandlersEndpointsV1PreuploadStatus 56 status isolateservice.HandlersEndpointsV1PreuploadStatus
42 digest isolated.HexDigest 57 digest isolated.HexDigest
43 size int64 58 size int64
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
110 index := int(e.Index) 125 index := int(e.Index)
111 out[index] = &PushState{ 126 out[index] = &PushState{
112 status: *e, 127 status: *e,
113 digest: isolated.HexDigest(items[index].Digest), 128 digest: isolated.HexDigest(items[index].Digest),
114 size: items[index].Size, 129 size: items[index].Size,
115 } 130 }
116 } 131 }
117 return out, nil 132 return out, nil
118 } 133 }
119 134
120 func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) { 135 func (i *isolateServer) Push(state *PushState, source Source) (err error) {
121 // This push operation may be a retry after failed finalization call bel ow, 136 // This push operation may be a retry after failed finalization call bel ow,
122 // no need to reupload contents in that case. 137 // no need to reupload contents in that case.
123 if !state.uploaded { 138 if !state.uploaded {
124 // PUT file to uploadURL. 139 // PUT file to uploadURL.
125 » » if err = i.doPush(state, src); err != nil { 140 » » if err = i.doPush(state, source); err != nil {
126 log.Printf("doPush(%s) failed: %s\n%#v", state.digest, e rr, state) 141 log.Printf("doPush(%s) failed: %s\n%#v", state.digest, e rr, state)
127 return 142 return
128 } 143 }
129 state.uploaded = true 144 state.uploaded = true
130 } 145 }
131 146
132 // Optionally notify the server that it's done. 147 // Optionally notify the server that it's done.
133 if state.status.GsUploadUrl != "" { 148 if state.status.GsUploadUrl != "" {
134 end := tracer.Span(i, "finalize", nil) 149 end := tracer.Span(i, "finalize", nil)
135 defer func() { end(tracer.Args{"err": err}) }() 150 defer func() { end(tracer.Args{"err": err}) }()
136 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and 151 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
137 // send it to isolated server. That way isolate server can verif y that 152 // send it to isolated server. That way isolate server can verif y that
138 // the data safely reached Google Storage (GS provides MD5 and C RC32C of 153 // the data safely reached Google Storage (GS provides MD5 and C RC32C of
139 // stored files). 154 // stored files).
140 in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTi cket: state.status.UploadTicket} 155 in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTi cket: state.status.UploadTicket}
141 headers := map[string]string{"Cache-Control": "public, max-age=3 1536000"} 156 headers := map[string]string{"Cache-Control": "public, max-age=3 1536000"}
142 if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", headers, in, nil); err != nil { 157 if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", headers, in, nil); err != nil {
143 log.Printf("Push(%s) (finalize) failed: %s\n%#v", state. digest, err, state) 158 log.Printf("Push(%s) (finalize) failed: %s\n%#v", state. digest, err, state)
144 return 159 return
145 } 160 }
146 } 161 }
147 state.finalized = true 162 state.finalized = true
148 return 163 return
149 } 164 }
150 165
151 func (i *isolateServer) doPush(state *PushState, src io.ReadSeeker) (err error) { 166 func (i *isolateServer) doPush(state *PushState, source Source) (err error) {
152 useDB := state.status.GsUploadUrl == "" 167 useDB := state.status.GsUploadUrl == ""
153 end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state. size}) 168 end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state. size})
154 defer func() { end(tracer.Args{"err": err}) }() 169 defer func() { end(tracer.Args{"err": err}) }()
155 if useDB { 170 if useDB {
171 src, err := source()
172 if err != nil {
173 return err
174 }
175 defer src.Close()
176
156 err = i.doPushDB(state, src) 177 err = i.doPushDB(state, src)
157 } else { 178 } else {
158 » » err = i.doPushGCS(state, src) 179 » » err = i.doPushGCS(state, source)
159 } 180 }
160 if err != nil { 181 if err != nil {
161 tracer.CounterAdd(i, "bytesUploaded", float64(state.size)) 182 tracer.CounterAdd(i, "bytesUploaded", float64(state.size))
162 } 183 }
163 return err 184 return err
164 } 185 }
165 186
166 func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error { 187 func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error {
167 buf := bytes.Buffer{} 188 buf := bytes.Buffer{}
168 compressor := isolated.GetCompressor(&buf) 189 compressor := isolated.GetCompressor(&buf)
169 if _, err := io.Copy(compressor, reader); err != nil { 190 if _, err := io.Copy(compressor, reader); err != nil {
170 return err 191 return err
171 } 192 }
172 if err := compressor.Close(); err != nil { 193 if err := compressor.Close(); err != nil {
173 return err 194 return err
174 } 195 }
175 in := &isolateservice.HandlersEndpointsV1StorageRequest{UploadTicket: st ate.status.UploadTicket, Content: buf.Bytes()} 196 in := &isolateservice.HandlersEndpointsV1StorageRequest{UploadTicket: st ate.status.UploadTicket, Content: buf.Bytes()}
176 return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, ni l) 197 return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, ni l)
177 } 198 }
178 199
179 func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err erro r) { 200 func (i *isolateServer) doPushGCS(state *PushState, source Source) (err error) {
180 » c := newCompressed(src)
181 » defer func() {
182 » » if err1 := c.Close(); err == nil {
183 » » » err = err1
184 » » }
185 » }()
186 // GsUploadUrl is signed Google Storage URL that doesn't require additio nal 201 // GsUploadUrl is signed Google Storage URL that doesn't require additio nal
187 // authentication. In fact, using authClient causes HTTP 403 because 202 // authentication. In fact, using authClient causes HTTP 403 because
188 // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymo us 203 // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymo us
189 // client instead. 204 // client instead.
190 » request, err2 := http.NewRequest("PUT", state.status.GsUploadUrl, c) 205 » req := lhttp.NewRequest(i.anonClient, func() (*http.Request, error) {
191 » if err2 != nil { 206 » » src, err := source()
192 » » return err2 207 » » if err != nil {
193 » } 208 » » » return nil, err
194 » request.Header.Set("Content-Type", "application/octet-stream") 209 » » }
195 » req, err3 := lhttp.NewRequest(i.anonClient, request, func(resp *http.Res ponse) error { 210
211 » » request, err := http.NewRequest("PUT", state.status.GsUploadUrl, nil)
212 » » if err != nil {
213 » » » src.Close()
214 » » » return nil, err
215 » » }
216 » » request.Body = newCompressed(src)
217 » » request.Header.Set("Content-Type", "application/octet-stream")
218 » » return request, nil
219 » }, func(resp *http.Response) error {
196 _, err4 := io.Copy(ioutil.Discard, resp.Body) 220 _, err4 := io.Copy(ioutil.Discard, resp.Body)
197 err5 := resp.Body.Close() 221 err5 := resp.Body.Close()
198 if err4 != nil { 222 if err4 != nil {
199 return err4 223 return err4
200 } 224 }
201 return err5 225 return err5
202 }) 226 })
203 if err3 != nil {
204 return err3
205 }
206 return i.config.Do(req) 227 return i.config.Do(req)
207 } 228 }
208 229
209 // compressed transparently compresses a source. 230 // compressed is an io.ReadCloser that transparently compresses source data in
210 // 231 // a separate goroutine.
211 // It supports seeking to the beginning of the file to enable re-reading the
212 // file multiple times. This is needed for HTTP retries.
213 type compressed struct { 232 type compressed struct {
214 » src io.ReadSeeker 233 » io.ReadCloser
215 » wg sync.WaitGroup
216 » r io.ReadCloser
217 } 234 }
218 235
219 func newCompressed(src io.ReadSeeker) *compressed { 236 func newCompressed(src io.Reader) *compressed {
220 » c := &compressed{src: src} 237 » pr, pw := io.Pipe()
221 » c.reset()
222 » return c
223 }
224
225 func (c *compressed) Close() error {
226 » var err error
227 » if c.r != nil {
228 » » err = c.r.Close()
229 » » c.r = nil
230 » }
231 » c.wg.Wait()
232 » return err
233 }
234
235 // Seek resets the compressor.
236 func (c *compressed) Seek(offset int64, whence int) (int64, error) {
237 » if offset != 0 || whence != 0 {
238 » » return 0, errors.New("compressed can only seek to 0")
239 » }
240 » err1 := c.Close()
241 » n, err2 := c.src.Seek(0, 0)
242 » c.reset()
243 » if err1 != nil {
244 » » return n, err1
245 » }
246 » return n, err2
247 }
248
249 func (c *compressed) Read(p []byte) (int, error) {
250 » return c.r.Read(p)
251 }
252
253 // reset restarts the compression loop.
254 func (c *compressed) reset() {
255 » var w *io.PipeWriter
256 » c.r, w = io.Pipe()
257 » c.wg.Add(1)
258 go func() { 238 go func() {
259 // The compressor itself is not thread safe. 239 // The compressor itself is not thread safe.
260 » » defer c.wg.Done() 240 » » compressor := isolated.GetCompressor(pw)
261 » » compressor := isolated.GetCompressor(w) 241
262 » » _, err := io.Copy(compressor, c.src) 242 » » buf := make([]byte, compressedBufSize)
263 » » if err2 := compressor.Close(); err == nil { 243 » » pw.CloseWithError(func() error {
264 » » » err = err2 244 » » » if _, err := io.CopyBuffer(compressor, src, buf); err != nil {
M-A Ruel 2016/08/19 17:23:34 I was rereading this code and do not understand wh
dnj 2016/08/19 22:33:14 I think my intent was to be able to control the bu
265 » » } 245 » » » » return err
266 » » w.CloseWithError(err) 246 » » » }
247 » » » return compressor.Close()
248 » » }())
267 }() 249 }()
250
251 return &compressed{pr}
268 } 252 }
OLDNEW
« no previous file with comments | « client/isolate/isolate.go ('k') | client/isolatedclient/isolatedclient_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698