Chromium Code Reviews| Index: client/isolatedclient/isolatedclient.go |
| diff --git a/client/isolatedclient/isolatedclient.go b/client/isolatedclient/isolatedclient.go |
| index 49f860aae3cb530b3ddee131c1ad5bfa0a3bde55..b64ff92b7ecbbbf00a324508bcd7ec5ba231450e 100644 |
| --- a/client/isolatedclient/isolatedclient.go |
| +++ b/client/isolatedclient/isolatedclient.go |
| @@ -12,7 +12,6 @@ import ( |
| "log" |
| "net/http" |
| "strings" |
| - "sync" |
| "github.com/luci/luci-go/client/internal/lhttp" |
| "github.com/luci/luci-go/client/internal/retry" |
| @@ -21,6 +20,22 @@ import ( |
| "github.com/luci/luci-go/common/isolated" |
| ) |
| +// compressedBufSize is the size of the read buffer that will be used to pull |
| +// data from a source into the compressor. |
| +const compressedBufSize = 4096 |
| + |
| +// Source is a generator method to return source data. A generated Source must |
| +// be Closed before the generator is called again. |
| +type Source func() (io.ReadCloser, error) |
| + |
| +// NewBytesSource returns a Source implementation that reads from the supplied |
| +// byte slice. |
| +func NewBytesSource(d []byte) Source { |
| + return func() (io.ReadCloser, error) { |
| + return ioutil.NopCloser(bytes.NewReader(d)), nil |
| + } |
| +} |
| + |
| // IsolateServer is the low-level client interface to interact with an Isolate |
| // server. |
| type IsolateServer interface { |
| @@ -30,7 +45,7 @@ type IsolateServer interface { |
| // The returned list is in the same order as 'items', with entries nil for |
| // items that were present. |
| Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushState, error) |
| - Push(state *PushState, src io.ReadSeeker) error |
| + Push(state *PushState, src Source) error |
| } |
| // PushState is per-item state passed from IsolateServer.Contains() to |
| @@ -117,12 +132,12 @@ func (i *isolateServer) Contains(items []*isolateservice.HandlersEndpointsV1Dige |
| return out, nil |
| } |
| -func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) { |
| +func (i *isolateServer) Push(state *PushState, source Source) (err error) { |
| // This push operation may be a retry after failed finalization call below, |
| // no need to reupload contents in that case. |
| if !state.uploaded { |
| // PUT file to uploadURL. |
| - if err = i.doPush(state, src); err != nil { |
| + if err = i.doPush(state, source); err != nil { |
| log.Printf("doPush(%s) failed: %s\n%#v", state.digest, err, state) |
| return |
| } |
| @@ -148,14 +163,20 @@ func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) { |
| return |
| } |
| -func (i *isolateServer) doPush(state *PushState, src io.ReadSeeker) (err error) { |
| +func (i *isolateServer) doPush(state *PushState, source Source) (err error) { |
| useDB := state.status.GsUploadUrl == "" |
| end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state.size}) |
| defer func() { end(tracer.Args{"err": err}) }() |
| if useDB { |
| + src, err := source() |
| + if err != nil { |
| + return err |
| + } |
| + defer src.Close() |
| + |
| err = i.doPushDB(state, src) |
| } else { |
| - err = i.doPushGCS(state, src) |
| + err = i.doPushGCS(state, source) |
| } |
| if err != nil { |
| tracer.CounterAdd(i, "bytesUploaded", float64(state.size)) |
| @@ -176,23 +197,26 @@ func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error { |
| return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, nil) |
| } |
| -func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err error) { |
| - c := newCompressed(src) |
| - defer func() { |
| - if err1 := c.Close(); err == nil { |
| - err = err1 |
| - } |
| - }() |
| +func (i *isolateServer) doPushGCS(state *PushState, source Source) (err error) { |
| // GsUploadUrl is signed Google Storage URL that doesn't require additional |
| // authentication. In fact, using authClient causes HTTP 403 because |
| // authClient's tokens don't have Cloud Storage OAuth scope. Use anonymous |
| // client instead. |
| - request, err2 := http.NewRequest("PUT", state.status.GsUploadUrl, c) |
| - if err2 != nil { |
| - return err2 |
| - } |
| - request.Header.Set("Content-Type", "application/octet-stream") |
| - req, err3 := lhttp.NewRequest(i.anonClient, request, func(resp *http.Response) error { |
| + req := lhttp.NewRequest(i.anonClient, func() (*http.Request, error) { |
| + src, err := source() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + request, err := http.NewRequest("PUT", state.status.GsUploadUrl, nil) |
| + if err != nil { |
| + src.Close() |
| + return nil, err |
| + } |
| + request.Body = newCompressed(src) |
| + request.Header.Set("Content-Type", "application/octet-stream") |
| + return request, nil |
| + }, func(resp *http.Response) error { |
| _, err4 := io.Copy(ioutil.Discard, resp.Body) |
| err5 := resp.Body.Close() |
| if err4 != nil { |
| @@ -200,69 +224,29 @@ func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err erro |
| } |
| return err5 |
| }) |
| - if err3 != nil { |
| - return err3 |
| - } |
| return i.config.Do(req) |
| } |
| -// compressed transparently compresses a source. |
| -// |
| -// It supports seeking to the beginning of the file to enable re-reading the |
| -// file multiple times. This is needed for HTTP retries. |
| +// compressed is an io.ReadCloser that transparently compresses source data in |
| +// a separate goroutine. |
| type compressed struct { |
| - src io.ReadSeeker |
| - wg sync.WaitGroup |
| - r io.ReadCloser |
| + io.ReadCloser |
| } |
| -func newCompressed(src io.ReadSeeker) *compressed { |
| - c := &compressed{src: src} |
| - c.reset() |
| - return c |
| -} |
| - |
| -func (c *compressed) Close() error { |
| - var err error |
| - if c.r != nil { |
| - err = c.r.Close() |
| - c.r = nil |
| - } |
| - c.wg.Wait() |
| - return err |
| -} |
| - |
| -// Seek resets the compressor. |
| -func (c *compressed) Seek(offset int64, whence int) (int64, error) { |
| - if offset != 0 || whence != 0 { |
| - return 0, errors.New("compressed can only seek to 0") |
| - } |
| - err1 := c.Close() |
| - n, err2 := c.src.Seek(0, 0) |
| - c.reset() |
| - if err1 != nil { |
| - return n, err1 |
| - } |
| - return n, err2 |
| -} |
| - |
| -func (c *compressed) Read(p []byte) (int, error) { |
| - return c.r.Read(p) |
| -} |
| - |
| -// reset restarts the compression loop. |
| -func (c *compressed) reset() { |
| - var w *io.PipeWriter |
| - c.r, w = io.Pipe() |
| - c.wg.Add(1) |
| +func newCompressed(src io.Reader) *compressed { |
| + pr, pw := io.Pipe() |
| go func() { |
| // The compressor itself is not thread safe. |
| - defer c.wg.Done() |
| - compressor := isolated.GetCompressor(w) |
| - _, err := io.Copy(compressor, c.src) |
| - if err2 := compressor.Close(); err == nil { |
| - err = err2 |
| - } |
| - w.CloseWithError(err) |
| + compressor := isolated.GetCompressor(pw) |
| + |
| + buf := make([]byte, compressedBufSize) |
| + pw.CloseWithError(func() error { |
| + if _, err := io.CopyBuffer(compressor, src, buf); err != nil { |
|
M-A Ruel
2016/08/19 17:23:34
I was rereading this code and do not understand wh
dnj
2016/08/19 22:33:14
I think my intent was to be able to control the bu
|
| + return err |
| + } |
| + return compressor.Close() |
| + }()) |
| }() |
| + |
| + return &compressed{pr} |
| } |