Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package isolatedclient | 5 package isolatedclient |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "io" | 10 "io" |
| 11 "io/ioutil" | 11 "io/ioutil" |
| 12 "log" | 12 "log" |
| 13 "net/http" | 13 "net/http" |
| 14 "strings" | 14 "strings" |
| 15 "sync" | |
| 16 | 15 |
| 17 "github.com/luci/luci-go/client/internal/lhttp" | 16 "github.com/luci/luci-go/client/internal/lhttp" |
| 18 "github.com/luci/luci-go/client/internal/retry" | 17 "github.com/luci/luci-go/client/internal/retry" |
| 19 "github.com/luci/luci-go/client/internal/tracer" | 18 "github.com/luci/luci-go/client/internal/tracer" |
| 20 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" | 19 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" |
| 21 "github.com/luci/luci-go/common/isolated" | 20 "github.com/luci/luci-go/common/isolated" |
| 22 ) | 21 ) |
| 23 | 22 |
| 23 const compressedBufSize = 4096 | |
|
M-A Ruel
2016/04/01 17:26:04
I don't think it's worth a named variable.
dnj
2016/04/01 18:49:11
I did this more in the spirit of having compile-ti
| |
| 24 | |
| 25 // Source is a generator method to return source data. A generated Source must | |
| 26 // be Closed before the generator is called again. | |
| 27 type Source func() (io.ReadCloser, error) | |
| 28 | |
| 29 // NewBytesSource returns a Source implementation that reads from the supplied | |
| 30 // byte slice. | |
| 31 func NewBytesSource(d []byte) Source { | |
| 32 return func() (io.ReadCloser, error) { | |
| 33 return ioutil.NopCloser(bytes.NewReader(d)), nil | |
| 34 } | |
| 35 } | |
| 36 | |
| 24 // IsolateServer is the low-level client interface to interact with an Isolate | 37 // IsolateServer is the low-level client interface to interact with an Isolate |
| 25 // server. | 38 // server. |
| 26 type IsolateServer interface { | 39 type IsolateServer interface { |
| 27 ServerCapabilities() (*isolateservice.HandlersEndpointsV1ServerDetails, error) | 40 ServerCapabilities() (*isolateservice.HandlersEndpointsV1ServerDetails, error) |
| 28 // Contains looks up cache presence on the server of multiple items. | 41 // Contains looks up cache presence on the server of multiple items. |
| 29 // | 42 // |
| 30 // The returned list is in the same order as 'items', with entries nil f or | 43 // The returned list is in the same order as 'items', with entries nil f or |
| 31 // items that were present. | 44 // items that were present. |
| 32 Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushStat e, error) | 45 Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushStat e, error) |
| 33 » Push(state *PushState, src io.ReadSeeker) error | 46 » Push(state *PushState, src Source) error |
| 34 } | 47 } |
| 35 | 48 |
| 36 // PushState is per-item state passed from IsolateServer.Contains() to | 49 // PushState is per-item state passed from IsolateServer.Contains() to |
| 37 // IsolateServer.Push(). | 50 // IsolateServer.Push(). |
| 38 // | 51 // |
| 39 // Its content is implementation specific. | 52 // Its content is implementation specific. |
| 40 type PushState struct { | 53 type PushState struct { |
| 41 status isolateservice.HandlersEndpointsV1PreuploadStatus | 54 status isolateservice.HandlersEndpointsV1PreuploadStatus |
| 42 digest isolated.HexDigest | 55 digest isolated.HexDigest |
| 43 size int64 | 56 size int64 |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 110 index := int(e.Index) | 123 index := int(e.Index) |
| 111 out[index] = &PushState{ | 124 out[index] = &PushState{ |
| 112 status: *e, | 125 status: *e, |
| 113 digest: isolated.HexDigest(items[index].Digest), | 126 digest: isolated.HexDigest(items[index].Digest), |
| 114 size: items[index].Size, | 127 size: items[index].Size, |
| 115 } | 128 } |
| 116 } | 129 } |
| 117 return out, nil | 130 return out, nil |
| 118 } | 131 } |
| 119 | 132 |
| 120 func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) { | 133 func (i *isolateServer) Push(state *PushState, source Source) (err error) { |
| 121 // This push operation may be a retry after failed finalization call bel ow, | 134 // This push operation may be a retry after failed finalization call bel ow, |
| 122 // no need to reupload contents in that case. | 135 // no need to reupload contents in that case. |
| 123 if !state.uploaded { | 136 if !state.uploaded { |
| 124 // PUT file to uploadURL. | 137 // PUT file to uploadURL. |
| 125 » » if err = i.doPush(state, src); err != nil { | 138 » » if err = i.doPush(state, source); err != nil { |
| 126 log.Printf("doPush(%s) failed: %s\n%#v", state.digest, e rr, state) | 139 log.Printf("doPush(%s) failed: %s\n%#v", state.digest, e rr, state) |
| 127 return | 140 return |
| 128 } | 141 } |
| 129 state.uploaded = true | 142 state.uploaded = true |
| 130 } | 143 } |
| 131 | 144 |
| 132 // Optionally notify the server that it's done. | 145 // Optionally notify the server that it's done. |
| 133 if state.status.GsUploadUrl != "" { | 146 if state.status.GsUploadUrl != "" { |
| 134 end := tracer.Span(i, "finalize", nil) | 147 end := tracer.Span(i, "finalize", nil) |
| 135 defer func() { end(tracer.Args{"err": err}) }() | 148 defer func() { end(tracer.Args{"err": err}) }() |
| 136 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and | 149 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
| 137 // send it to isolated server. That way isolate server can verif y that | 150 // send it to isolated server. That way isolate server can verif y that |
| 138 // the data safely reached Google Storage (GS provides MD5 and C RC32C of | 151 // the data safely reached Google Storage (GS provides MD5 and C RC32C of |
| 139 // stored files). | 152 // stored files). |
| 140 in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTi cket: state.status.UploadTicket} | 153 in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTi cket: state.status.UploadTicket} |
| 141 headers := map[string]string{"Cache-Control": "public, max-age=3 1536000"} | 154 headers := map[string]string{"Cache-Control": "public, max-age=3 1536000"} |
| 142 if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", headers, in, nil); err != nil { | 155 if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", headers, in, nil); err != nil { |
| 143 log.Printf("Push(%s) (finalize) failed: %s\n%#v", state. digest, err, state) | 156 log.Printf("Push(%s) (finalize) failed: %s\n%#v", state. digest, err, state) |
| 144 return | 157 return |
| 145 } | 158 } |
| 146 } | 159 } |
| 147 state.finalized = true | 160 state.finalized = true |
| 148 return | 161 return |
| 149 } | 162 } |
| 150 | 163 |
| 151 func (i *isolateServer) doPush(state *PushState, src io.ReadSeeker) (err error) { | 164 func (i *isolateServer) doPush(state *PushState, source Source) (err error) { |
| 152 useDB := state.status.GsUploadUrl == "" | 165 useDB := state.status.GsUploadUrl == "" |
| 153 end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state. size}) | 166 end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state. size}) |
| 154 defer func() { end(tracer.Args{"err": err}) }() | 167 defer func() { end(tracer.Args{"err": err}) }() |
| 155 if useDB { | 168 if useDB { |
| 169 src, err := source() | |
| 170 if err != nil { | |
| 171 return err | |
| 172 } | |
| 173 defer src.Close() | |
| 174 | |
| 156 err = i.doPushDB(state, src) | 175 err = i.doPushDB(state, src) |
| 157 } else { | 176 } else { |
| 158 » » err = i.doPushGCS(state, src) | 177 » » err = i.doPushGCS(state, source) |
| 159 } | 178 } |
| 160 if err != nil { | 179 if err != nil { |
| 161 tracer.CounterAdd(i, "bytesUploaded", float64(state.size)) | 180 tracer.CounterAdd(i, "bytesUploaded", float64(state.size)) |
| 162 } | 181 } |
| 163 return err | 182 return err |
| 164 } | 183 } |
| 165 | 184 |
| 166 func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error { | 185 func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error { |
| 167 buf := bytes.Buffer{} | 186 buf := bytes.Buffer{} |
| 168 compressor := isolated.GetCompressor(&buf) | 187 compressor := isolated.GetCompressor(&buf) |
| 169 if _, err := io.Copy(compressor, reader); err != nil { | 188 if _, err := io.Copy(compressor, reader); err != nil { |
| 170 return err | 189 return err |
| 171 } | 190 } |
| 172 if err := compressor.Close(); err != nil { | 191 if err := compressor.Close(); err != nil { |
| 173 return err | 192 return err |
| 174 } | 193 } |
| 175 in := &isolateservice.HandlersEndpointsV1StorageRequest{UploadTicket: st ate.status.UploadTicket, Content: buf.Bytes()} | 194 in := &isolateservice.HandlersEndpointsV1StorageRequest{UploadTicket: st ate.status.UploadTicket, Content: buf.Bytes()} |
| 176 return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, ni l) | 195 return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, ni l) |
| 177 } | 196 } |
| 178 | 197 |
| 179 func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err erro r) { | 198 func (i *isolateServer) doPushGCS(state *PushState, source Source) (err error) { |
| 180 » c := newCompressed(src) | |
| 181 » defer func() { | |
| 182 » » if err1 := c.Close(); err == nil { | |
| 183 » » » err = err1 | |
| 184 » » } | |
| 185 » }() | |
| 186 // GsUploadUrl is signed Google Storage URL that doesn't require additio nal | 199 // GsUploadUrl is signed Google Storage URL that doesn't require additio nal |
| 187 // authentication. In fact, using authClient causes HTTP 403 because | 200 // authentication. In fact, using authClient causes HTTP 403 because |
| 188 // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymo us | 201 // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymo us |
| 189 // client instead. | 202 // client instead. |
| 190 » request, err2 := http.NewRequest("PUT", state.status.GsUploadUrl, c) | 203 » return i.config.Do(lhttp.NewRequest(i.anonClient, func() (*http.Request, error) { |
|
M-A Ruel
2016/04/01 17:26:04
I think I'd prefer to use a named variable for the
dnj
2016/04/01 18:49:12
Hah I one-lined it specifically thinking you'd pre
| |
| 191 » if err2 != nil { | 204 » » src, err := source() |
| 192 » » return err2 | 205 » » if err != nil { |
| 193 » } | 206 » » » return nil, err |
| 194 » request.Header.Set("Content-Type", "application/octet-stream") | 207 » » } |
| 195 » req, err3 := lhttp.NewRequest(i.anonClient, request, func(resp *http.Res ponse) error { | 208 |
| 209 » » request, err := http.NewRequest("PUT", state.status.GsUploadUrl, nil) | |
| 210 » » if err != nil { | |
| 211 » » » return nil, err | |
|
M-A Ruel
2016/04/01 17:26:04
you need to close src in that case.
dnj
2016/04/01 18:49:12
epp, good catch. Done.
| |
| 212 » » } | |
| 213 » » request.Body = newCompressed(src) | |
| 214 » » request.Header.Set("Content-Type", "application/octet-stream") | |
| 215 » » return request, nil | |
| 216 » }, func(resp *http.Response) error { | |
| 196 _, err4 := io.Copy(ioutil.Discard, resp.Body) | 217 _, err4 := io.Copy(ioutil.Discard, resp.Body) |
| 197 err5 := resp.Body.Close() | 218 err5 := resp.Body.Close() |
| 198 if err4 != nil { | 219 if err4 != nil { |
| 199 return err4 | 220 return err4 |
| 200 } | 221 } |
| 201 return err5 | 222 return err5 |
| 202 » }) | 223 » })) |
| 203 » if err3 != nil { | |
| 204 » » return err3 | |
| 205 » } | |
| 206 » return i.config.Do(req) | |
| 207 } | 224 } |
| 208 | 225 |
| 209 // compressed transparently compresses a source. | 226 // compressed is an io.ReadCloser that transparently compresses source data in |
| 210 // | 227 // a separate goroutine. |
| 211 // It supports seeking to the beginning of the file to enable re-reading the | |
| 212 // file multiple times. This is needed for HTTP retries. | |
| 213 type compressed struct { | 228 type compressed struct { |
| 214 » src io.ReadSeeker | 229 » io.ReadCloser |
| 215 » wg sync.WaitGroup | |
| 216 » r io.ReadCloser | |
| 217 } | 230 } |
| 218 | 231 |
| 219 func newCompressed(src io.ReadSeeker) *compressed { | 232 func newCompressed(src io.Reader) *compressed { |
|
M-A Ruel
2016/04/01 17:26:04
The previous code was contrived mostly to save on
dnj
2016/04/01 18:49:11
Understood.
| |
| 220 » c := &compressed{src: src} | 233 » pr, pw := io.Pipe() |
| 221 » c.reset() | |
| 222 » return c | |
| 223 } | |
| 224 | |
| 225 func (c *compressed) Close() error { | |
| 226 » var err error | |
| 227 » if c.r != nil { | |
| 228 » » err = c.r.Close() | |
| 229 » » c.r = nil | |
| 230 » } | |
| 231 » c.wg.Wait() | |
| 232 » return err | |
| 233 } | |
| 234 | |
| 235 // Seek resets the compressor. | |
| 236 func (c *compressed) Seek(offset int64, whence int) (int64, error) { | |
| 237 » if offset != 0 || whence != 0 { | |
| 238 » » return 0, errors.New("compressed can only seek to 0") | |
| 239 » } | |
| 240 » err1 := c.Close() | |
| 241 » n, err2 := c.src.Seek(0, 0) | |
| 242 » c.reset() | |
| 243 » if err1 != nil { | |
| 244 » » return n, err1 | |
| 245 » } | |
| 246 » return n, err2 | |
| 247 } | |
| 248 | |
| 249 func (c *compressed) Read(p []byte) (int, error) { | |
| 250 » return c.r.Read(p) | |
| 251 } | |
| 252 | |
| 253 // reset restarts the compression loop. | |
| 254 func (c *compressed) reset() { | |
| 255 » var w *io.PipeWriter | |
| 256 » c.r, w = io.Pipe() | |
| 257 » c.wg.Add(1) | |
| 258 go func() { | 234 go func() { |
| 259 // The compressor itself is not thread safe. | 235 // The compressor itself is not thread safe. |
| 260 » » defer c.wg.Done() | 236 » » compressor := isolated.GetCompressor(pw) |
| 261 » » compressor := isolated.GetCompressor(w) | 237 |
| 262 » » _, err := io.Copy(compressor, c.src) | 238 » » buf := make([]byte, compressedBufSize) |
| 263 » » if err2 := compressor.Close(); err == nil { | 239 » » pw.CloseWithError(func() error { |
| 264 » » » err = err2 | 240 » » » if _, err := io.CopyBuffer(compressor, src, buf); err != nil { |
| 265 » » } | 241 » » » » return err |
| 266 » » w.CloseWithError(err) | 242 » » » } |
| 243 » » » return compressor.Close() | |
| 244 » » }()) | |
| 267 }() | 245 }() |
| 246 | |
| 247 return &compressed{pr} | |
| 268 } | 248 } |
| OLD | NEW |