Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(135)

Side by Side Diff: client/isolatedclient/isolatedclient.go

Issue 1135173003: Create packages client/internal/ retry and lhttp. (Closed) Base URL: git@github.com:luci/luci-go@3_UI
Patch Set: . Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package isolatedclient 5 package isolatedclient
6 6
7 import ( 7 import (
8 "errors"
8 "io" 9 "io"
9 "io/ioutil" 10 "io/ioutil"
10 "net/http" 11 "net/http"
11 "strconv" 12 "strconv"
13 "strings"
12 14
13 » "github.com/luci/luci-go/client/internal/common" 15 » "github.com/luci/luci-go/client/internal/lhttp"
16 » "github.com/luci/luci-go/client/internal/retry"
14 "github.com/luci/luci-go/client/internal/tracer" 17 "github.com/luci/luci-go/client/internal/tracer"
15 "github.com/luci/luci-go/common/isolated" 18 "github.com/luci/luci-go/common/isolated"
16 ) 19 )
17 20
18 // IsolateServer is the low-level client interface to interact with an Isolate 21 // IsolateServer is the low-level client interface to interact with an Isolate
19 // server. 22 // server.
20 type IsolateServer interface { 23 type IsolateServer interface {
21 ServerCapabilities() (*isolated.ServerCapabilities, error) 24 ServerCapabilities() (*isolated.ServerCapabilities, error)
22 // Contains looks up cache presence on the server of multiple items. 25 // Contains looks up cache presence on the server of multiple items.
23 // 26 //
24 // The returned list is in the same order as 'items', with entries nil f or 27 // The returned list is in the same order as 'items', with entries nil f or
25 // items that were present. 28 // items that were present.
26 Contains(items []*isolated.DigestItem) ([]*PushState, error) 29 Contains(items []*isolated.DigestItem) ([]*PushState, error)
27 Push(state *PushState, src io.Reader) error 30 Push(state *PushState, src io.Reader) error
28 } 31 }
29 32
30 // PushState is per-item state passed from IsolateServer.Contains() to 33 // PushState is per-item state passed from IsolateServer.Contains() to
31 // IsolateServer.Push(). 34 // IsolateServer.Push().
32 // 35 //
33 // Its content is implementation specific. 36 // Its content is implementation specific.
34 type PushState struct { 37 type PushState struct {
35 status isolated.PreuploadStatus 38 status isolated.PreuploadStatus
36 size int64 39 size int64
37 uploaded bool 40 uploaded bool
38 finalized bool 41 finalized bool
39 } 42 }
40 43
41 // New returns a new IsolateServer client. 44 // New returns a new IsolateServer client.
42 func New(url, namespace string) IsolateServer { 45 func New(host, namespace string) IsolateServer {
43 i := &isolateServer{ 46 i := &isolateServer{
44 » » url: url, 47 » » url: strings.TrimRight(host, "/"),
45 namespace: namespace, 48 namespace: namespace,
46 } 49 }
47 » tracer.NewTID(i, nil, url) 50 » tracer.NewTID(i, nil, i.url)
48 return i 51 return i
49 } 52 }
50 53
51 // Private details. 54 // Private details.
52 55
53 type isolateServer struct { 56 type isolateServer struct {
54 url string 57 url string
55 namespace string 58 namespace string
56 } 59 }
57 60
61 func (i *isolateServer) postJSON(resource string, in, out interface{}) error {
62 if len(resource) == 0 || resource[0] != '/' {
63 return errors.New("resource must start with '/'")
64 }
65 return lhttp.PostJSON(retry.Default, http.DefaultClient, i.url+resource, in, out)
Vadim Sh. 2015/05/13 21:54:48 let's put http.Client and retry.Config into isolat
66 }
67
58 func (i *isolateServer) ServerCapabilities() (*isolated.ServerCapabilities, erro r) { 68 func (i *isolateServer) ServerCapabilities() (*isolated.ServerCapabilities, erro r) {
59 url := i.url + "/_ah/api/isolateservice/v1/server_details"
60 out := &isolated.ServerCapabilities{} 69 out := &isolated.ServerCapabilities{}
61 » if _, err := common.PostJSON(nil, url, nil, out); err != nil { 70 » if err := i.postJSON("/_ah/api/isolateservice/v1/server_details", map[st ring]string{}, out); err != nil {
62 return nil, err 71 return nil, err
63 } 72 }
64 return out, nil 73 return out, nil
65 } 74 }
66 75
67 func (i *isolateServer) Contains(items []*isolated.DigestItem) (out []*PushState , err error) { 76 func (i *isolateServer) Contains(items []*isolated.DigestItem) (out []*PushState , err error) {
68 end := tracer.Span(i, "contains", strconv.Itoa(len(items)), nil) 77 end := tracer.Span(i, "contains", strconv.Itoa(len(items)), nil)
69 defer func() { end(tracer.Args{"err": err}) }() 78 defer func() { end(tracer.Args{"err": err}) }()
70 in := isolated.DigestCollection{Items: items} 79 in := isolated.DigestCollection{Items: items}
71 in.Namespace.Namespace = i.namespace 80 in.Namespace.Namespace = i.namespace
72 data := &isolated.UrlCollection{} 81 data := &isolated.UrlCollection{}
73 » url := i.url + "/_ah/api/isolateservice/v1/preupload" 82 » if err = i.postJSON("/_ah/api/isolateservice/v1/preupload", in, data); e rr != nil {
74 » if _, err = common.PostJSON(nil, url, in, data); err != nil {
75 return nil, err 83 return nil, err
76 } 84 }
77 out = make([]*PushState, len(items)) 85 out = make([]*PushState, len(items))
78 for _, e := range data.Items { 86 for _, e := range data.Items {
79 index := int(e.Index) 87 index := int(e.Index)
80 out[index] = &PushState{ 88 out[index] = &PushState{
81 status: e, 89 status: e,
82 size: items[index].Size, 90 size: items[index].Size,
83 } 91 }
84 } 92 }
(...skipping 13 matching lines...) Expand all
98 106
99 // Optionally notify the server that it's done. 107 // Optionally notify the server that it's done.
100 if state.status.GSUploadURL != "" { 108 if state.status.GSUploadURL != "" {
101 end := tracer.Span(i, "finalize", "finalize", nil) 109 end := tracer.Span(i, "finalize", "finalize", nil)
102 defer func() { end(tracer.Args{"err": err}) }() 110 defer func() { end(tracer.Args{"err": err}) }()
103 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and 111 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
104 // send it to isolated server. That way isolate server can verif y that 112 // send it to isolated server. That way isolate server can verif y that
105 // the data safely reached Google Storage (GS provides MD5 and C RC32C of 113 // the data safely reached Google Storage (GS provides MD5 and C RC32C of
106 // stored files). 114 // stored files).
107 in := isolated.FinalizeRequest{state.status.UploadTicket} 115 in := isolated.FinalizeRequest{state.status.UploadTicket}
108 » » url := i.url + "/_ah/api/isolateservice/v1/finalize_gs_upload" 116 » » if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo ad", in, nil); err != nil {
109 » » _, err = common.PostJSON(nil, url, in, nil)
110 » » if err != nil {
111 return 117 return
112 } 118 }
113 } 119 }
114 state.finalized = true 120 state.finalized = true
115 return 121 return
116 } 122 }
117 123
118 func (i *isolateServer) doPush(state *PushState, src io.Reader) (err error) { 124 func (i *isolateServer) doPush(state *PushState, src io.Reader) (err error) {
119 end := tracer.Span(i, "push", strconv.FormatInt(state.size, 10), nil) 125 end := tracer.Span(i, "push", strconv.FormatInt(state.size, 10), nil)
120 defer func() { end(tracer.Args{"err": err}) }() 126 defer func() { end(tracer.Args{"err": err}) }()
(...skipping 11 matching lines...) Expand all
132 }() 138 }()
133 defer func() { 139 defer func() {
134 err4 := <-c 140 err4 := <-c
135 if err == nil { 141 if err == nil {
136 err = err4 142 err = err4
137 } 143 }
138 }() 144 }()
139 145
140 // DB upload. 146 // DB upload.
141 if state.status.GSUploadURL == "" { 147 if state.status.GSUploadURL == "" {
142 url := i.url + "/_ah/api/isolateservice/v1/store_inline"
143 content, err2 := ioutil.ReadAll(reader) 148 content, err2 := ioutil.ReadAll(reader)
144 if err2 != nil { 149 if err2 != nil {
145 return err2 150 return err2
146 } 151 }
147 in := &isolated.StorageRequest{state.status.UploadTicket, conten t} 152 in := &isolated.StorageRequest{state.status.UploadTicket, conten t}
148 » » _, err = common.PostJSON(nil, url, in, nil) 153 » » return i.postJSON("/_ah/api/isolateservice/v1/store_inline", in, nil)
149 » » return
150 } 154 }
151 155
152 // Upload to GCS. 156 // Upload to GCS.
153 client := &http.Client{} 157 client := &http.Client{}
154 request, err5 := http.NewRequest("PUT", state.status.GSUploadURL, reader ) 158 request, err5 := http.NewRequest("PUT", state.status.GSUploadURL, reader )
155 if err5 != nil { 159 if err5 != nil {
156 return err5 160 return err5
157 } 161 }
158 request.Header.Set("Content-Type", "application/octet-stream") 162 request.Header.Set("Content-Type", "application/octet-stream")
159 // TODO(maruel): For relatively small file, set request.ContentLength so the 163 // TODO(maruel): For relatively small file, set request.ContentLength so the
160 // TCP connection can be reused. 164 // TCP connection can be reused.
161 resp, err6 := client.Do(request) 165 resp, err6 := client.Do(request)
162 if err6 != nil { 166 if err6 != nil {
163 return err6 167 return err6
164 } 168 }
165 _, err = io.Copy(ioutil.Discard, resp.Body) 169 _, err = io.Copy(ioutil.Discard, resp.Body)
166 resp.Body.Close() 170 resp.Body.Close()
167 return 171 return
168 } 172 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698