Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(32)

Unified Diff: client/isolatedclient/isolatedclient.go

Issue 1846263002: Isolate: Use generators instead of seekers (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Tweaks from comments. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/isolate/isolate.go ('k') | client/isolatedclient/isolatedclient_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/isolatedclient/isolatedclient.go
diff --git a/client/isolatedclient/isolatedclient.go b/client/isolatedclient/isolatedclient.go
index 49f860aae3cb530b3ddee131c1ad5bfa0a3bde55..b64ff92b7ecbbbf00a324508bcd7ec5ba231450e 100644
--- a/client/isolatedclient/isolatedclient.go
+++ b/client/isolatedclient/isolatedclient.go
@@ -12,7 +12,6 @@ import (
"log"
"net/http"
"strings"
- "sync"
"github.com/luci/luci-go/client/internal/lhttp"
"github.com/luci/luci-go/client/internal/retry"
@@ -21,6 +20,22 @@ import (
"github.com/luci/luci-go/common/isolated"
)
+// compressedBufSize is the size of the read buffer that will be used to pull
+// data from a source into the compressor.
+const compressedBufSize = 4096
+
+// Source is a generator method to return source data. A generated Source must
+// be Closed before the generator is called again.
+type Source func() (io.ReadCloser, error)
+
+// NewBytesSource returns a Source implementation that reads from the supplied
+// byte slice.
+func NewBytesSource(d []byte) Source {
+ return func() (io.ReadCloser, error) {
+ return ioutil.NopCloser(bytes.NewReader(d)), nil
+ }
+}
+
// IsolateServer is the low-level client interface to interact with an Isolate
// server.
type IsolateServer interface {
@@ -30,7 +45,7 @@ type IsolateServer interface {
// The returned list is in the same order as 'items', with entries nil for
// items that were present.
Contains(items []*isolateservice.HandlersEndpointsV1Digest) ([]*PushState, error)
- Push(state *PushState, src io.ReadSeeker) error
+ Push(state *PushState, src Source) error
}
// PushState is per-item state passed from IsolateServer.Contains() to
@@ -117,12 +132,12 @@ func (i *isolateServer) Contains(items []*isolateservice.HandlersEndpointsV1Dige
return out, nil
}
-func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) {
+func (i *isolateServer) Push(state *PushState, source Source) (err error) {
// This push operation may be a retry after failed finalization call below,
// no need to reupload contents in that case.
if !state.uploaded {
// PUT file to uploadURL.
- if err = i.doPush(state, src); err != nil {
+ if err = i.doPush(state, source); err != nil {
log.Printf("doPush(%s) failed: %s\n%#v", state.digest, err, state)
return
}
@@ -148,14 +163,20 @@ func (i *isolateServer) Push(state *PushState, src io.ReadSeeker) (err error) {
return
}
-func (i *isolateServer) doPush(state *PushState, src io.ReadSeeker) (err error) {
+func (i *isolateServer) doPush(state *PushState, source Source) (err error) {
useDB := state.status.GsUploadUrl == ""
end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state.size})
defer func() { end(tracer.Args{"err": err}) }()
if useDB {
+ src, err := source()
+ if err != nil {
+ return err
+ }
+ defer src.Close()
+
err = i.doPushDB(state, src)
} else {
- err = i.doPushGCS(state, src)
+ err = i.doPushGCS(state, source)
}
if err != nil {
tracer.CounterAdd(i, "bytesUploaded", float64(state.size))
@@ -176,23 +197,26 @@ func (i *isolateServer) doPushDB(state *PushState, reader io.Reader) error {
return i.postJSON("/_ah/api/isolateservice/v1/store_inline", nil, in, nil)
}
-func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err error) {
- c := newCompressed(src)
- defer func() {
- if err1 := c.Close(); err == nil {
- err = err1
- }
- }()
+func (i *isolateServer) doPushGCS(state *PushState, source Source) (err error) {
// GsUploadUrl is signed Google Storage URL that doesn't require additional
// authentication. In fact, using authClient causes HTTP 403 because
// authClient's tokens don't have Cloud Storage OAuth scope. Use anonymous
// client instead.
- request, err2 := http.NewRequest("PUT", state.status.GsUploadUrl, c)
- if err2 != nil {
- return err2
- }
- request.Header.Set("Content-Type", "application/octet-stream")
- req, err3 := lhttp.NewRequest(i.anonClient, request, func(resp *http.Response) error {
+ req := lhttp.NewRequest(i.anonClient, func() (*http.Request, error) {
+ src, err := source()
+ if err != nil {
+ return nil, err
+ }
+
+ request, err := http.NewRequest("PUT", state.status.GsUploadUrl, nil)
+ if err != nil {
+ src.Close()
+ return nil, err
+ }
+ request.Body = newCompressed(src)
+ request.Header.Set("Content-Type", "application/octet-stream")
+ return request, nil
+ }, func(resp *http.Response) error {
_, err4 := io.Copy(ioutil.Discard, resp.Body)
err5 := resp.Body.Close()
if err4 != nil {
@@ -200,69 +224,29 @@ func (i *isolateServer) doPushGCS(state *PushState, src io.ReadSeeker) (err erro
}
return err5
})
- if err3 != nil {
- return err3
- }
return i.config.Do(req)
}
-// compressed transparently compresses a source.
-//
-// It supports seeking to the beginning of the file to enable re-reading the
-// file multiple times. This is needed for HTTP retries.
+// compressed is an io.ReadCloser that transparently compresses source data in
+// a separate goroutine.
type compressed struct {
- src io.ReadSeeker
- wg sync.WaitGroup
- r io.ReadCloser
+ io.ReadCloser
}
-func newCompressed(src io.ReadSeeker) *compressed {
- c := &compressed{src: src}
- c.reset()
- return c
-}
-
-func (c *compressed) Close() error {
- var err error
- if c.r != nil {
- err = c.r.Close()
- c.r = nil
- }
- c.wg.Wait()
- return err
-}
-
-// Seek resets the compressor.
-func (c *compressed) Seek(offset int64, whence int) (int64, error) {
- if offset != 0 || whence != 0 {
- return 0, errors.New("compressed can only seek to 0")
- }
- err1 := c.Close()
- n, err2 := c.src.Seek(0, 0)
- c.reset()
- if err1 != nil {
- return n, err1
- }
- return n, err2
-}
-
-func (c *compressed) Read(p []byte) (int, error) {
- return c.r.Read(p)
-}
-
-// reset restarts the compression loop.
-func (c *compressed) reset() {
- var w *io.PipeWriter
- c.r, w = io.Pipe()
- c.wg.Add(1)
+func newCompressed(src io.Reader) *compressed {
+ pr, pw := io.Pipe()
go func() {
// The compressor itself is not thread safe.
- defer c.wg.Done()
- compressor := isolated.GetCompressor(w)
- _, err := io.Copy(compressor, c.src)
- if err2 := compressor.Close(); err == nil {
- err = err2
- }
- w.CloseWithError(err)
+ compressor := isolated.GetCompressor(pw)
+
+ buf := make([]byte, compressedBufSize)
+ pw.CloseWithError(func() error {
+ if _, err := io.CopyBuffer(compressor, src, buf); err != nil {
M-A Ruel 2016/08/19 17:23:34 I was rereading this code and do not understand wh
dnj 2016/08/19 22:33:14 I think my intent was to be able to control the bu
+ return err
+ }
+ return compressor.Close()
+ }())
}()
+
+ return &compressed{pr}
}
« no previous file with comments | « client/isolate/isolate.go ('k') | client/isolatedclient/isolatedclient_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698