Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: client/cmd/isolate/uploader.go

Issue 2992693002: isolate: add uploadtracker tests for symlinks and isolated files (Closed)
Patch Set: address review comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cmd/isolate/upload_tracker_test.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. 1 // Copyright 2016 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 package main 15 package main
16 16
17 import ( 17 import (
18 "bytes" 18 "bytes"
19 "fmt" 19 "fmt"
20 "io" 20 "io"
21 "io/ioutil" 21 "io/ioutil"
22 "log" 22 "log"
23 "sync" 23 "sync"
24 24
25 "github.com/luci/luci-go/common/isolatedclient" 25 "github.com/luci/luci-go/common/isolatedclient"
26 26
27 "golang.org/x/net/context" 27 "golang.org/x/net/context"
28 ) 28 )
29 29
30 // Uploader uses an isolatedclient.Client to upload items to the server. 30 // Uploader uploads items to the server.
31 // It has a single implementation, *ConcurrentUploader. See ConcurrentUploader f or method documentation.
32 type Uploader interface {
33 » Close() error
34 » Upload(name string, src isolatedclient.Source, ps *isolatedclient.PushSt ate, done func())
35 » UploadBytes(name string, b []byte, ps *isolatedclient.PushState, done fu nc())
36 » UploadFile(item *Item, ps *isolatedclient.PushState, done func())
37 }
38
39 // ConcurrentUploader uses an isolatedclient.Client to upload items to the serve r.
31 // All methods are safe for concurrent use. 40 // All methods are safe for concurrent use.
32 type Uploader struct { 41 type ConcurrentUploader struct {
33 ctx context.Context 42 ctx context.Context
34 svc isolateService 43 svc isolateService
35 waitc chan bool // Used to cap concurrent uploads. 44 waitc chan bool // Used to cap concurrent uploads.
36 wg sync.WaitGroup 45 wg sync.WaitGroup
37 46
38 errMu sync.Mutex 47 errMu sync.Mutex
39 err error // The first error encountered, if any. 48 err error // The first error encountered, if any.
40 } 49 }
41 50
42 // NewUploader creates a new Uploader with the given isolated client. 51 // NewUploader creates a new ConcurrentUploader with the given isolated client.
43 // maxConcurrent controls maximum number of uploads to be in-flight at once. 52 // maxConcurrent controls maximum number of uploads to be in-flight at once.
44 // The provided context is used to make all requests to the isolate server. 53 // The provided context is used to make all requests to the isolate server.
45 func NewUploader(ctx context.Context, client *isolatedclient.Client, maxConcurre nt int) *Uploader { 54 func NewUploader(ctx context.Context, client *isolatedclient.Client, maxConcurre nt int) *ConcurrentUploader {
46 return newUploader(ctx, client, maxConcurrent) 55 return newUploader(ctx, client, maxConcurrent)
47 } 56 }
48 57
49 func newUploader(ctx context.Context, svc isolateService, maxConcurrent int) *Up loader { 58 func newUploader(ctx context.Context, svc isolateService, maxConcurrent int) *Co ncurrentUploader {
50 const concurrentUploads = 10 59 const concurrentUploads = 10
51 » return &Uploader{ 60 » return &ConcurrentUploader{
52 ctx: ctx, 61 ctx: ctx,
53 svc: svc, 62 svc: svc,
54 waitc: make(chan bool, maxConcurrent), 63 waitc: make(chan bool, maxConcurrent),
55 } 64 }
56 } 65 }
57 66
58 // Upload uploads an item from an isolated.Source. Upload does not block. If 67 // Upload uploads an item from an isolated.Source. Upload does not block. If
59 // not-nil, the done func will be invoked on upload completion (both success 68 // not-nil, the done func will be invoked on upload completion (both success
60 // and failure). 69 // and failure).
61 func (u *Uploader) Upload(name string, src isolatedclient.Source, ps *isolatedcl ient.PushState, done func()) { 70 func (u *ConcurrentUploader) Upload(name string, src isolatedclient.Source, ps * isolatedclient.PushState, done func()) {
62 u.wg.Add(1) 71 u.wg.Add(1)
63 go u.upload(name, src, ps, done) 72 go u.upload(name, src, ps, done)
64 } 73 }
65 74
66 // UploadBytes uploads an item held in-memory. UploadBytes does not block. If 75 // UploadBytes uploads an item held in-memory. UploadBytes does not block. If
67 // not-nil, the done func will be invoked on upload completion (both success 76 // not-nil, the done func will be invoked on upload completion (both success
68 // and failure). The provided byte slice b must not be modified until the 77 // and failure). The provided byte slice b must not be modified until the
69 // upload is completed. 78 // upload is completed.
70 // TODO(djd): Consider using Upload directly and deleting UploadBytes. 79 // TODO(djd): Consider using Upload directly and deleting UploadBytes.
71 func (u *Uploader) UploadBytes(name string, b []byte, ps *isolatedclient.PushSta te, done func()) { 80 func (u *ConcurrentUploader) UploadBytes(name string, b []byte, ps *isolatedclie nt.PushState, done func()) {
72 u.wg.Add(1) 81 u.wg.Add(1)
73 go u.upload(name, byteSource(b), ps, done) 82 go u.upload(name, byteSource(b), ps, done)
74 } 83 }
75 84
76 // UploadFile uploads a file from disk. UploadFile does not block. If 85 // UploadFile uploads a file from disk. UploadFile does not block. If
77 // not-nil, the done func will be invoked on upload completion (both success 86 // not-nil, the done func will be invoked on upload completion (both success
78 // and failure). 87 // and failure).
79 // TODO(djd): Consider using Upload directly and deleting UploadFile. 88 // TODO(djd): Consider using Upload directly and deleting UploadFile.
80 func (u *Uploader) UploadFile(item *Item, ps *isolatedclient.PushState, done fun c()) { 89 func (u *ConcurrentUploader) UploadFile(item *Item, ps *isolatedclient.PushState , done func()) {
81 u.wg.Add(1) 90 u.wg.Add(1)
82 go u.upload(item.RelPath, fileSource(item.Path), ps, done) 91 go u.upload(item.RelPath, fileSource(item.Path), ps, done)
83 } 92 }
84 93
85 // Close waits for any pending uploads (and associated done callbacks) to 94 // Close waits for any pending uploads (and associated done callbacks) to
86 // complete, and returns the first encountered error if any. 95 // complete, and returns the first encountered error if any.
87 // Uploader cannot be used once it is closed. 96 // Uploader cannot be used once it is closed.
88 func (u *Uploader) Close() error { 97 func (u *ConcurrentUploader) Close() error {
89 u.wg.Wait() 98 u.wg.Wait()
90 close(u.waitc) // Sanity check that we don't do any more uploading. 99 close(u.waitc) // Sanity check that we don't do any more uploading.
91 return u.err 100 return u.err
92 } 101 }
93 102
94 func (u *Uploader) upload(name string, src isolatedclient.Source, ps *isolatedcl ient.PushState, done func()) { 103 func (u *ConcurrentUploader) upload(name string, src isolatedclient.Source, ps * isolatedclient.PushState, done func()) {
95 u.waitc <- true 104 u.waitc <- true
96 defer func() { 105 defer func() {
97 <-u.waitc 106 <-u.waitc
98 }() 107 }()
99 defer u.wg.Done() 108 defer u.wg.Done()
100 if done != nil { 109 if done != nil {
101 defer done() 110 defer done()
102 } 111 }
103 112
104 // Bail out early if there already was an error. 113 // Bail out early if there already was an error.
105 if u.getErr() != nil { 114 if u.getErr() != nil {
106 log.Printf("WARNING dropped %q from Uploader", name) 115 log.Printf("WARNING dropped %q from Uploader", name)
107 return 116 return
108 } 117 }
109 118
110 err := u.svc.Push(u.ctx, ps, src) 119 err := u.svc.Push(u.ctx, ps, src)
111 if err != nil { 120 if err != nil {
112 u.setErr(fmt.Errorf("pushing %q: %v", name, err)) 121 u.setErr(fmt.Errorf("pushing %q: %v", name, err))
113 } 122 }
114 } 123 }
115 124
116 func (u *Uploader) getErr() error { 125 func (u *ConcurrentUploader) getErr() error {
117 u.errMu.Lock() 126 u.errMu.Lock()
118 defer u.errMu.Unlock() 127 defer u.errMu.Unlock()
119 return u.err 128 return u.err
120 } 129 }
121 130
122 func (u *Uploader) setErr(err error) { 131 func (u *ConcurrentUploader) setErr(err error) {
123 u.errMu.Lock() 132 u.errMu.Lock()
124 defer u.errMu.Unlock() 133 defer u.errMu.Unlock()
125 if u.err == nil { 134 if u.err == nil {
126 u.err = err 135 u.err = err
127 } 136 }
128 } 137 }
129 138
130 func byteSource(b []byte) isolatedclient.Source { 139 func byteSource(b []byte) isolatedclient.Source {
131 return func() (io.ReadCloser, error) { 140 return func() (io.ReadCloser, error) {
132 return ioutil.NopCloser(bytes.NewReader(b)), nil 141 return ioutil.NopCloser(bytes.NewReader(b)), nil
133 } 142 }
134 } 143 }
135 144
136 func fileSource(path string) isolatedclient.Source { 145 func fileSource(path string) isolatedclient.Source {
137 return func() (io.ReadCloser, error) { 146 return func() (io.ReadCloser, error) {
138 return osOpen(path) 147 return osOpen(path)
139 } 148 }
140 } 149 }
OLDNEW
« no previous file with comments | « client/cmd/isolate/upload_tracker_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698