Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package isolatedclient | 5 package isolatedclient |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "io" | 10 "io" |
| 11 "io/ioutil" | 11 "io/ioutil" |
| 12 "log" | 12 "log" |
| 13 "net/http" | 13 "net/http" |
| 14 "strings" | 14 "strings" |
| 15 "sync" | |
| 16 | 15 |
| 17 "github.com/luci/luci-go/client/internal/lhttp" | 16 "github.com/luci/luci-go/client/internal/lhttp" |
| 18 "github.com/luci/luci-go/client/internal/retry" | 17 "github.com/luci/luci-go/client/internal/retry" |
| 19 "github.com/luci/luci-go/client/internal/tracer" | 18 "github.com/luci/luci-go/client/internal/tracer" |
| 20 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" | 19 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" |
| 21 "github.com/luci/luci-go/common/isolated" | 20 "github.com/luci/luci-go/common/isolated" |
| 22 ) | 21 ) |
| 23 | 22 |
| 23 // compressedBufSize is the size of the read buffer that will be used to pull | |
| 24 // data from a source into the compressor. | |
| 25 const compressedBufSize = 4096 | |
| 26 | |
| 27 // Source is a generator method to return source data. A generated Source must | |
| 28 // be Closed before the generator is called again. | |
| 29 type Source func() (io.ReadCloser, error) | |
| 30 | |
| 31 // NewBytesSource returns a Source implementation that reads from the supplied | |
| 32 // byte slice. | |
| 33 func NewBytesSource(d []byte) Source { | |
| 34 return func() (io.ReadCloser, error) { | |
| 35 return ioutil.NopCloser(bytes.NewReader(d)), nil | |
| 36 } | |
| 37 } | |
| 38 | |
| 24 // IsolateServer is the low-level client interface to interact with an Isolate | 39 // IsolateServer is the low-level client interface to interact with an Isolate |
| 25 // server. | 40 // server. |
| 26 type IsolateServer interface { | 41 type IsolateServer interface { |
| 27 ServerCapabilities() (*isolateservice.HandlersEndpointsV1ServerDetails, error) | 42 ServerCapabilities() (*isolateservice.HandlersEndpointsV1ServerDetails, error) |
| 28 // Contains looks up cache presence on the server of multiple items. | 43 // Contains looks up cache presence on the server of multiple items. |
| 29 // | 44 // |
| 30 // The returned list is in the same order as 'items', with entries nil f or | 45 // The returned list is in the same order as 'items', with entries nil f or |
| 31 // items that were present. | 46 // items that were present. |
| 32 Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushStat e, error) | 47 Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushStat e, error) |
| 33 » Push(state *PushState, src io.ReadSeeker) error | 48 » Push(state *PushState, src Source) error |
| 34 } | 49 } |
| 35 | 50 |
| 36 // PushState is per-item state passed from IsolateServer.Contains() to | 51 // PushState is per-item state passed from IsolateServer.Contains() to |
| 37 // IsolateServer.Push(). | 52 // IsolateServer.Push(). |
| 38 // | 53 // |
| 39 // Its content is implementation specific. | 54 // Its content is implementation specific. |
| 40 type PushState struct { | 55 type PushState struct { |
| 41 status isolateservice.HandlersEndpointsV1PreuploadStatus | 56 status isolateservice.HandlersEndpointsV1PreuploadStatus |
| 42 digest isolated.HexDigest | 57 digest isolated.HexDigest |
| 43 size int64 | 58 size int64 |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 110 index := int(e.Index) | 125 index := int(e.Index) |
| 111 out[index] = &PushState{ | 126 out[index] = &PushState{ |
| 112 status: *e, | 127 status: *e, |
| 113 digest: isolated.HexDigest(items[index].Digest), | 128 digest: isolated.HexDigest(items[index].Digest), |
| 114 size: items[index].Size, | 129 size: items[index].Size, |
| 115 } | 130 } |
| 116 } | 131 } |
| 117 return out, nil | 132 return out, nil |
| 118 } | 133 } |
| 119 | 134 |
| 120 func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) { | 135 func (i *isolateServer) Push(state *PushState, source Source) (err error) { |
| 121 // This push operation may be a retry after failed finalization call bel ow, | 136 // This push operation may be a retry after failed finalization call bel ow, |
| 122 // no need to reupload contents in that case. | 137 // no need to reupload contents in that case. |
| 123 if !state.uploaded { | 138 if !state.uploaded { |
| 124 // PUT file to uploadURL. | 139 // PUT file to uploadURL. |
| 125 » » if err = i.doPush(state, src); err != nil { | 140 » » if err = i.doPush(state, source); err != nil { |
| 126 log.Printf("doPush(%s) failed: %s\n%#v", state.digest, e rr, state) | 141 log.Printf("doPush(%s) failed: %s\n%#v", state.digest, e rr, state) |
| 127 return | 142 return |
| 128 } | 143 } |
| 129 state.uploaded = true | 144 state.uploaded = true |
| 130 } | 145 } |
| 131 | 146 |
| 132 // Optionally notify the server that it's done. | 147 // Optionally notify the server that it's done. |
| 133 if state.status.GsUploadUrl != "" { | 148 if state.status.GsUploadUrl != "" { |
| 134 end := tracer.Span(i, "finalize", nil) | 149 end := tracer.Span(i, "finalize", nil) |
| 135 defer func() { end(tracer.Args{"err": err}) }() | 150 defer func() { end(tracer.Args{"err": err}) }() |
| 136 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and | 151 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
| 137 // send it to isolated server. That way isolate server can verif y that | 152 // send it to isolated server. That way isolate server can verif y that |
| 138 // the data safely reached Google Storage (GS provides MD5 and C RC32C of | 153 // the data safely reached Google Storage (GS provides MD5 and C RC32C of |
| 139 // stored files). | 154 // stored files). |
| 140 in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTi cket: state.status.UploadTicket} | 155 in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTi cket: state.status.UploadTicket} |
| 141 headers := map[string]string{"Cache-Control": "public, max-age=3 1536000"} | 156 headers := map[string]string{"Cache-Control": "public, max-age=3 1536000"} |
| 142 if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", headers, in, nil); err != nil { | 157 if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", headers, in, nil); err != nil { |
| 143 log.Printf("Push(%s) (finalize) failed: %s\n%#v", state. digest, err, state) | 158 log.Printf("Push(%s) (finalize) failed: %s\n%#v", state. digest, err, state) |
| 144 return | 159 return |
| 145 } | 160 } |
| 146 } | 161 } |
| 147 state.finalized = true | 162 state.finalized = true |
| 148 return | 163 return |
| 149 } | 164 } |
| 150 | 165 |
| 151 func (i *isolateServer) doPush(state *PushState, src io.ReadSeeker) (err error) { | 166 func (i *isolateServer) doPush(state *PushState, source Source) (err error) { |
| 152 useDB := state.status.GsUploadUrl == "" | 167 useDB := state.status.GsUploadUrl == "" |
| 153 end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state. size}) | 168 end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state. size}) |
| 154 defer func() { end(tracer.Args{"err": err}) }() | 169 defer func() { end(tracer.Args{"err": err}) }() |
| 155 if useDB { | 170 if useDB { |
| 171 src, err := source() | |
| 172 if err != nil { | |
| 173 return err | |
| 174 } | |
| 175 defer src.Close() | |
| 176 | |
| 156 err = i.doPushDB(state, src) | 177 err = i.doPushDB(state, src) |
| 157 } else { | 178 } else { |
| 158 » » err = i.doPushGCS(state, src) | 179 » » err = i.doPushGCS(state, source) |
| 159 } | 180 } |
| 160 if err != nil { | 181 if err != nil { |
| 161 tracer.CounterAdd(i, "bytesUploaded", float64(state.size)) | 182 tracer.CounterAdd(i, "bytesUploaded", float64(state.size)) |
| 162 } | 183 } |
| 163 return err | 184 return err |
| 164 } | 185 } |
| 165 | 186 |
| 166 func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error { | 187 func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error { |
| 167 buf := bytes.Buffer{} | 188 buf := bytes.Buffer{} |
| 168 compressor := isolated.GetCompressor(&buf) | 189 compressor := isolated.GetCompressor(&buf) |
| 169 if _, err := io.Copy(compressor, reader); err != nil { | 190 if _, err := io.Copy(compressor, reader); err != nil { |
| 170 return err | 191 return err |
| 171 } | 192 } |
| 172 if err := compressor.Close(); err != nil { | 193 if err := compressor.Close(); err != nil { |
| 173 return err | 194 return err |
| 174 } | 195 } |
| 175 in := &isolateservice.HandlersEndpointsV1StorageRequest{UploadTicket: st ate.status.UploadTicket, Content: buf.Bytes()} | 196 in := &isolateservice.HandlersEndpointsV1StorageRequest{UploadTicket: st ate.status.UploadTicket, Content: buf.Bytes()} |
| 176 return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, ni l) | 197 return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, ni l) |
| 177 } | 198 } |
| 178 | 199 |
| 179 func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err erro r) { | 200 func (i *isolateServer) doPushGCS(state *PushState, source Source) (err error) { |
| 180 » c := newCompressed(src) | |
| 181 » defer func() { | |
| 182 » » if err1 := c.Close(); err == nil { | |
| 183 » » » err = err1 | |
| 184 » » } | |
| 185 » }() | |
| 186 // GsUploadUrl is signed Google Storage URL that doesn't require additio nal | 201 // GsUploadUrl is signed Google Storage URL that doesn't require additio nal |
| 187 // authentication. In fact, using authClient causes HTTP 403 because | 202 // authentication. In fact, using authClient causes HTTP 403 because |
| 188 // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymo us | 203 // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymo us |
| 189 // client instead. | 204 // client instead. |
| 190 » request, err2 := http.NewRequest("PUT", state.status.GsUploadUrl, c) | 205 » req := lhttp.NewRequest(i.anonClient, func() (*http.Request, error) { |
| 191 » if err2 != nil { | 206 » » src, err := source() |
| 192 » » return err2 | 207 » » if err != nil { |
| 193 » } | 208 » » » return nil, err |
| 194 » request.Header.Set("Content-Type", "application/octet-stream") | 209 » » } |
| 195 » req, err3 := lhttp.NewRequest(i.anonClient, request, func(resp *http.Res ponse) error { | 210 |
| 211 » » request, err := http.NewRequest("PUT", state.status.GsUploadUrl, nil) | |
| 212 » » if err != nil { | |
| 213 » » » src.Close() | |
| 214 » » » return nil, err | |
| 215 » » } | |
| 216 » » request.Body = newCompressed(src) | |
| 217 » » request.Header.Set("Content-Type", "application/octet-stream") | |
| 218 » » return request, nil | |
| 219 » }, func(resp *http.Response) error { | |
| 196 _, err4 := io.Copy(ioutil.Discard, resp.Body) | 220 _, err4 := io.Copy(ioutil.Discard, resp.Body) |
| 197 err5 := resp.Body.Close() | 221 err5 := resp.Body.Close() |
| 198 if err4 != nil { | 222 if err4 != nil { |
| 199 return err4 | 223 return err4 |
| 200 } | 224 } |
| 201 return err5 | 225 return err5 |
| 202 }) | 226 }) |
| 203 if err3 != nil { | |
| 204 return err3 | |
| 205 } | |
| 206 return i.config.Do(req) | 227 return i.config.Do(req) |
| 207 } | 228 } |
| 208 | 229 |
| 209 // compressed transparently compresses a source. | 230 // compressed is an io.ReadCloser that transparently compresses source data in |
| 210 // | 231 // a separate goroutine. |
| 211 // It supports seeking to the beginning of the file to enable re-reading the | |
| 212 // file multiple times. This is needed for HTTP retries. | |
| 213 type compressed struct { | 232 type compressed struct { |
| 214 » src io.ReadSeeker | 233 » io.ReadCloser |
| 215 » wg sync.WaitGroup | |
| 216 » r io.ReadCloser | |
| 217 } | 234 } |
| 218 | 235 |
| 219 func newCompressed(src io.ReadSeeker) *compressed { | 236 func newCompressed(src io.Reader) *compressed { |
| 220 » c := &compressed{src: src} | 237 » pr, pw := io.Pipe() |
| 221 » c.reset() | |
| 222 » return c | |
| 223 } | |
| 224 | |
| 225 func (c *compressed) Close() error { | |
| 226 » var err error | |
| 227 » if c.r != nil { | |
| 228 » » err = c.r.Close() | |
| 229 » » c.r = nil | |
| 230 » } | |
| 231 » c.wg.Wait() | |
| 232 » return err | |
| 233 } | |
| 234 | |
| 235 // Seek resets the compressor. | |
| 236 func (c *compressed) Seek(offset int64, whence int) (int64, error) { | |
| 237 » if offset != 0 || whence != 0 { | |
| 238 » » return 0, errors.New("compressed can only seek to 0") | |
| 239 » } | |
| 240 » err1 := c.Close() | |
| 241 » n, err2 := c.src.Seek(0, 0) | |
| 242 » c.reset() | |
| 243 » if err1 != nil { | |
| 244 » » return n, err1 | |
| 245 » } | |
| 246 » return n, err2 | |
| 247 } | |
| 248 | |
| 249 func (c *compressed) Read(p []byte) (int, error) { | |
| 250 » return c.r.Read(p) | |
| 251 } | |
| 252 | |
| 253 // reset restarts the compression loop. | |
| 254 func (c *compressed) reset() { | |
| 255 » var w *io.PipeWriter | |
| 256 » c.r, w = io.Pipe() | |
| 257 » c.wg.Add(1) | |
| 258 go func() { | 238 go func() { |
| 259 // The compressor itself is not thread safe. | 239 // The compressor itself is not thread safe. |
| 260 » » defer c.wg.Done() | 240 » » compressor := isolated.GetCompressor(pw) |
| 261 » » compressor := isolated.GetCompressor(w) | 241 |
| 262 » » _, err := io.Copy(compressor, c.src) | 242 » » buf := make([]byte, compressedBufSize) |
| 263 » » if err2 := compressor.Close(); err == nil { | 243 » » pw.CloseWithError(func() error { |
| 264 » » » err = err2 | 244 » » » if _, err := io.CopyBuffer(compressor, src, buf); err != nil { |
|
M-A Ruel
2016/08/19 17:23:34
I was rereading this code and do not understand wh
dnj
2016/08/19 22:33:14
I think my intent was to be able to control the bu
| |
| 265 » » } | 245 » » » » return err |
| 266 » » w.CloseWithError(err) | 246 » » » } |
| 247 » » » return compressor.Close() | |
| 248 » » }()) | |
| 267 }() | 249 }() |
| 250 | |
| 251 return &compressed{pr} | |
| 268 } | 252 } |
| OLD | NEW |