| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. | 1 // Copyright 2016 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 package main | 15 package main |
| 16 | 16 |
| 17 import ( | 17 import ( |
| 18 "bytes" | 18 "bytes" |
| 19 "fmt" | 19 "fmt" |
| 20 "io" | 20 "io" |
| 21 "io/ioutil" | 21 "io/ioutil" |
| 22 "log" | 22 "log" |
| 23 "sync" | 23 "sync" |
| 24 | 24 |
| 25 "github.com/luci/luci-go/common/isolatedclient" | 25 "github.com/luci/luci-go/common/isolatedclient" |
| 26 | 26 |
| 27 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
| 28 ) | 28 ) |
| 29 | 29 |
| 30 // Uploader uses an isolatedclient.Client to upload items to the server. | 30 // Uploader uploads items to the server. |
| 31 // It has a single implementation, *ConcurrentUploader. See ConcurrentUploader f
or method documentation. |
| 32 type Uploader interface { |
| 33 » Close() error |
| 34 » Upload(name string, src isolatedclient.Source, ps *isolatedclient.PushSt
ate, done func()) |
| 35 » UploadBytes(name string, b []byte, ps *isolatedclient.PushState, done fu
nc()) |
| 36 » UploadFile(item *Item, ps *isolatedclient.PushState, done func()) |
| 37 } |
| 38 |
| 39 // ConcurrentUploader uses an isolatedclient.Client to upload items to the serve
r. |
| 31 // All methods are safe for concurrent use. | 40 // All methods are safe for concurrent use. |
| 32 type Uploader struct { | 41 type ConcurrentUploader struct { |
| 33 ctx context.Context | 42 ctx context.Context |
| 34 svc isolateService | 43 svc isolateService |
| 35 waitc chan bool // Used to cap concurrent uploads. | 44 waitc chan bool // Used to cap concurrent uploads. |
| 36 wg sync.WaitGroup | 45 wg sync.WaitGroup |
| 37 | 46 |
| 38 errMu sync.Mutex | 47 errMu sync.Mutex |
| 39 err error // The first error encountered, if any. | 48 err error // The first error encountered, if any. |
| 40 } | 49 } |
| 41 | 50 |
| 42 // NewUploader creates a new Uploader with the given isolated client. | 51 // NewUploader creates a new ConcurrentUploader with the given isolated client. |
| 43 // maxConcurrent controls maximum number of uploads to be in-flight at once. | 52 // maxConcurrent controls maximum number of uploads to be in-flight at once. |
| 44 // The provided context is used to make all requests to the isolate server. | 53 // The provided context is used to make all requests to the isolate server. |
| 45 func NewUploader(ctx context.Context, client *isolatedclient.Client, maxConcurre
nt int) *Uploader { | 54 func NewUploader(ctx context.Context, client *isolatedclient.Client, maxConcurre
nt int) *ConcurrentUploader { |
| 46 return newUploader(ctx, client, maxConcurrent) | 55 return newUploader(ctx, client, maxConcurrent) |
| 47 } | 56 } |
| 48 | 57 |
| 49 func newUploader(ctx context.Context, svc isolateService, maxConcurrent int) *Up
loader { | 58 func newUploader(ctx context.Context, svc isolateService, maxConcurrent int) *Co
ncurrentUploader { |
| 50 const concurrentUploads = 10 | 59 const concurrentUploads = 10 |
| 51 » return &Uploader{ | 60 » return &ConcurrentUploader{ |
| 52 ctx: ctx, | 61 ctx: ctx, |
| 53 svc: svc, | 62 svc: svc, |
| 54 waitc: make(chan bool, maxConcurrent), | 63 waitc: make(chan bool, maxConcurrent), |
| 55 } | 64 } |
| 56 } | 65 } |
| 57 | 66 |
| 58 // Upload uploads an item from an isolated.Source. Upload does not block. If | 67 // Upload uploads an item from an isolated.Source. Upload does not block. If |
| 59 // not-nil, the done func will be invoked on upload completion (both success | 68 // not-nil, the done func will be invoked on upload completion (both success |
| 60 // and failure). | 69 // and failure). |
| 61 func (u *Uploader) Upload(name string, src isolatedclient.Source, ps *isolatedcl
ient.PushState, done func()) { | 70 func (u *ConcurrentUploader) Upload(name string, src isolatedclient.Source, ps *
isolatedclient.PushState, done func()) { |
| 62 u.wg.Add(1) | 71 u.wg.Add(1) |
| 63 go u.upload(name, src, ps, done) | 72 go u.upload(name, src, ps, done) |
| 64 } | 73 } |
| 65 | 74 |
| 66 // UploadBytes uploads an item held in-memory. UploadBytes does not block. If | 75 // UploadBytes uploads an item held in-memory. UploadBytes does not block. If |
| 67 // not-nil, the done func will be invoked on upload completion (both success | 76 // not-nil, the done func will be invoked on upload completion (both success |
| 68 // and failure). The provided byte slice b must not be modified until the | 77 // and failure). The provided byte slice b must not be modified until the |
| 69 // upload is completed. | 78 // upload is completed. |
| 70 // TODO(djd): Consider using Upload directly and deleting UploadBytes. | 79 // TODO(djd): Consider using Upload directly and deleting UploadBytes. |
| 71 func (u *Uploader) UploadBytes(name string, b []byte, ps *isolatedclient.PushSta
te, done func()) { | 80 func (u *ConcurrentUploader) UploadBytes(name string, b []byte, ps *isolatedclie
nt.PushState, done func()) { |
| 72 u.wg.Add(1) | 81 u.wg.Add(1) |
| 73 go u.upload(name, byteSource(b), ps, done) | 82 go u.upload(name, byteSource(b), ps, done) |
| 74 } | 83 } |
| 75 | 84 |
| 76 // UploadFile uploads a file from disk. UploadFile does not block. If | 85 // UploadFile uploads a file from disk. UploadFile does not block. If |
| 77 // not-nil, the done func will be invoked on upload completion (both success | 86 // not-nil, the done func will be invoked on upload completion (both success |
| 78 // and failure). | 87 // and failure). |
| 79 // TODO(djd): Consider using Upload directly and deleting UploadFile. | 88 // TODO(djd): Consider using Upload directly and deleting UploadFile. |
| 80 func (u *Uploader) UploadFile(item *Item, ps *isolatedclient.PushState, done fun
c()) { | 89 func (u *ConcurrentUploader) UploadFile(item *Item, ps *isolatedclient.PushState
, done func()) { |
| 81 u.wg.Add(1) | 90 u.wg.Add(1) |
| 82 go u.upload(item.RelPath, fileSource(item.Path), ps, done) | 91 go u.upload(item.RelPath, fileSource(item.Path), ps, done) |
| 83 } | 92 } |
| 84 | 93 |
| 85 // Close waits for any pending uploads (and associated done callbacks) to | 94 // Close waits for any pending uploads (and associated done callbacks) to |
| 86 // complete, and returns the first encountered error if any. | 95 // complete, and returns the first encountered error if any. |
| 87 // Uploader cannot be used once it is closed. | 96 // Uploader cannot be used once it is closed. |
| 88 func (u *Uploader) Close() error { | 97 func (u *ConcurrentUploader) Close() error { |
| 89 u.wg.Wait() | 98 u.wg.Wait() |
| 90 close(u.waitc) // Sanity check that we don't do any more uploading. | 99 close(u.waitc) // Sanity check that we don't do any more uploading. |
| 91 return u.err | 100 return u.err |
| 92 } | 101 } |
| 93 | 102 |
| 94 func (u *Uploader) upload(name string, src isolatedclient.Source, ps *isolatedcl
ient.PushState, done func()) { | 103 func (u *ConcurrentUploader) upload(name string, src isolatedclient.Source, ps *
isolatedclient.PushState, done func()) { |
| 95 u.waitc <- true | 104 u.waitc <- true |
| 96 defer func() { | 105 defer func() { |
| 97 <-u.waitc | 106 <-u.waitc |
| 98 }() | 107 }() |
| 99 defer u.wg.Done() | 108 defer u.wg.Done() |
| 100 if done != nil { | 109 if done != nil { |
| 101 defer done() | 110 defer done() |
| 102 } | 111 } |
| 103 | 112 |
| 104 // Bail out early if there already was an error. | 113 // Bail out early if there already was an error. |
| 105 if u.getErr() != nil { | 114 if u.getErr() != nil { |
| 106 log.Printf("WARNING dropped %q from Uploader", name) | 115 log.Printf("WARNING dropped %q from Uploader", name) |
| 107 return | 116 return |
| 108 } | 117 } |
| 109 | 118 |
| 110 err := u.svc.Push(u.ctx, ps, src) | 119 err := u.svc.Push(u.ctx, ps, src) |
| 111 if err != nil { | 120 if err != nil { |
| 112 u.setErr(fmt.Errorf("pushing %q: %v", name, err)) | 121 u.setErr(fmt.Errorf("pushing %q: %v", name, err)) |
| 113 } | 122 } |
| 114 } | 123 } |
| 115 | 124 |
| 116 func (u *Uploader) getErr() error { | 125 func (u *ConcurrentUploader) getErr() error { |
| 117 u.errMu.Lock() | 126 u.errMu.Lock() |
| 118 defer u.errMu.Unlock() | 127 defer u.errMu.Unlock() |
| 119 return u.err | 128 return u.err |
| 120 } | 129 } |
| 121 | 130 |
| 122 func (u *Uploader) setErr(err error) { | 131 func (u *ConcurrentUploader) setErr(err error) { |
| 123 u.errMu.Lock() | 132 u.errMu.Lock() |
| 124 defer u.errMu.Unlock() | 133 defer u.errMu.Unlock() |
| 125 if u.err == nil { | 134 if u.err == nil { |
| 126 u.err = err | 135 u.err = err |
| 127 } | 136 } |
| 128 } | 137 } |
| 129 | 138 |
| 130 func byteSource(b []byte) isolatedclient.Source { | 139 func byteSource(b []byte) isolatedclient.Source { |
| 131 return func() (io.ReadCloser, error) { | 140 return func() (io.ReadCloser, error) { |
| 132 return ioutil.NopCloser(bytes.NewReader(b)), nil | 141 return ioutil.NopCloser(bytes.NewReader(b)), nil |
| 133 } | 142 } |
| 134 } | 143 } |
| 135 | 144 |
| 136 func fileSource(path string) isolatedclient.Source { | 145 func fileSource(path string) isolatedclient.Source { |
| 137 return func() (io.ReadCloser, error) { | 146 return func() (io.ReadCloser, error) { |
| 138 return osOpen(path) | 147 return osOpen(path) |
| 139 } | 148 } |
| 140 } | 149 } |
| OLD | NEW |