| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package isolatedclient | 5 package isolatedclient |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" |
| 8 "io" | 9 "io" |
| 9 "io/ioutil" | 10 "io/ioutil" |
| 10 "net/http" | 11 "net/http" |
| 11 "strconv" | 12 "strconv" |
| 13 "strings" |
| 12 | 14 |
| 13 » "github.com/luci/luci-go/client/internal/common" | 15 » "github.com/luci/luci-go/client/internal/lhttp" |
| 16 » "github.com/luci/luci-go/client/internal/retry" |
| 14 "github.com/luci/luci-go/client/internal/tracer" | 17 "github.com/luci/luci-go/client/internal/tracer" |
| 15 "github.com/luci/luci-go/common/isolated" | 18 "github.com/luci/luci-go/common/isolated" |
| 16 ) | 19 ) |
| 17 | 20 |
| 18 // IsolateServer is the low-level client interface to interact with an Isolate | 21 // IsolateServer is the low-level client interface to interact with an Isolate |
| 19 // server. | 22 // server. |
| 20 type IsolateServer interface { | 23 type IsolateServer interface { |
| 21 ServerCapabilities() (*isolated.ServerCapabilities, error) | 24 ServerCapabilities() (*isolated.ServerCapabilities, error) |
| 22 // Contains looks up cache presence on the server of multiple items. | 25 // Contains looks up cache presence on the server of multiple items. |
| 23 // | 26 // |
| 24 // The returned list is in the same order as 'items', with entries nil f
or | 27 // The returned list is in the same order as 'items', with entries nil f
or |
| 25 // items that were present. | 28 // items that were present. |
| 26 Contains(items []*isolated.DigestItem) ([]*PushState, error) | 29 Contains(items []*isolated.DigestItem) ([]*PushState, error) |
| 27 Push(state *PushState, src io.Reader) error | 30 Push(state *PushState, src io.Reader) error |
| 28 } | 31 } |
| 29 | 32 |
| 30 // PushState is per-item state passed from IsolateServer.Contains() to | 33 // PushState is per-item state passed from IsolateServer.Contains() to |
| 31 // IsolateServer.Push(). | 34 // IsolateServer.Push(). |
| 32 // | 35 // |
| 33 // Its content is implementation specific. | 36 // Its content is implementation specific. |
| 34 type PushState struct { | 37 type PushState struct { |
| 35 status isolated.PreuploadStatus | 38 status isolated.PreuploadStatus |
| 36 size int64 | 39 size int64 |
| 37 uploaded bool | 40 uploaded bool |
| 38 finalized bool | 41 finalized bool |
| 39 } | 42 } |
| 40 | 43 |
| 41 // New returns a new IsolateServer client. | 44 // New returns a new IsolateServer client. |
| 42 func New(url, namespace string) IsolateServer { | 45 func New(host, namespace string) IsolateServer { |
| 43 i := &isolateServer{ | 46 i := &isolateServer{ |
| 44 » » url: url, | 47 » » url: strings.TrimRight(host, "/"), |
| 45 namespace: namespace, | 48 namespace: namespace, |
| 46 } | 49 } |
| 47 » tracer.NewTID(i, nil, url) | 50 » tracer.NewTID(i, nil, i.url) |
| 48 return i | 51 return i |
| 49 } | 52 } |
| 50 | 53 |
| 51 // Private details. | 54 // Private details. |
| 52 | 55 |
| 53 type isolateServer struct { | 56 type isolateServer struct { |
| 54 url string | 57 url string |
| 55 namespace string | 58 namespace string |
| 56 } | 59 } |
| 57 | 60 |
| 61 func (i *isolateServer) postJSON(resource string, in, out interface{}) error { |
| 62 if len(resource) == 0 || resource[0] != '/' { |
| 63 return errors.New("resource must start with '/'") |
| 64 } |
| 65 _, err := lhttp.PostJSON(retry.Default, http.DefaultClient, i.url+resour
ce, in, out) |
| 66 return err |
| 67 } |
| 68 |
| 58 func (i *isolateServer) ServerCapabilities() (*isolated.ServerCapabilities, erro
r) { | 69 func (i *isolateServer) ServerCapabilities() (*isolated.ServerCapabilities, erro
r) { |
| 59 url := i.url + "/_ah/api/isolateservice/v1/server_details" | |
| 60 out := &isolated.ServerCapabilities{} | 70 out := &isolated.ServerCapabilities{} |
| 61 » if _, err := common.PostJSON(nil, url, nil, out); err != nil { | 71 » if err := i.postJSON("/_ah/api/isolateservice/v1/server_details", map[st
ring]string{}, out); err != nil { |
| 62 return nil, err | 72 return nil, err |
| 63 } | 73 } |
| 64 return out, nil | 74 return out, nil |
| 65 } | 75 } |
| 66 | 76 |
| 67 func (i *isolateServer) Contains(items []*isolated.DigestItem) (out []*PushState
, err error) { | 77 func (i *isolateServer) Contains(items []*isolated.DigestItem) (out []*PushState
, err error) { |
| 68 end := tracer.Span(i, "contains", strconv.Itoa(len(items)), nil) | 78 end := tracer.Span(i, "contains", strconv.Itoa(len(items)), nil) |
| 69 defer func() { end(tracer.Args{"err": err}) }() | 79 defer func() { end(tracer.Args{"err": err}) }() |
| 70 in := isolated.DigestCollection{Items: items} | 80 in := isolated.DigestCollection{Items: items} |
| 71 in.Namespace.Namespace = i.namespace | 81 in.Namespace.Namespace = i.namespace |
| 72 data := &isolated.UrlCollection{} | 82 data := &isolated.UrlCollection{} |
| 73 » url := i.url + "/_ah/api/isolateservice/v1/preupload" | 83 » if err = i.postJSON("/_ah/api/isolateservice/v1/preupload", in, data); e
rr != nil { |
| 74 » if _, err = common.PostJSON(nil, url, in, data); err != nil { | |
| 75 return nil, err | 84 return nil, err |
| 76 } | 85 } |
| 77 out = make([]*PushState, len(items)) | 86 out = make([]*PushState, len(items)) |
| 78 for _, e := range data.Items { | 87 for _, e := range data.Items { |
| 79 index := int(e.Index) | 88 index := int(e.Index) |
| 80 out[index] = &PushState{ | 89 out[index] = &PushState{ |
| 81 status: e, | 90 status: e, |
| 82 size: items[index].Size, | 91 size: items[index].Size, |
| 83 } | 92 } |
| 84 } | 93 } |
| (...skipping 13 matching lines...) Expand all Loading... |
| 98 | 107 |
| 99 // Optionally notify the server that it's done. | 108 // Optionally notify the server that it's done. |
| 100 if state.status.GSUploadURL != "" { | 109 if state.status.GSUploadURL != "" { |
| 101 end := tracer.Span(i, "finalize", "finalize", nil) | 110 end := tracer.Span(i, "finalize", "finalize", nil) |
| 102 defer func() { end(tracer.Args{"err": err}) }() | 111 defer func() { end(tracer.Args{"err": err}) }() |
| 103 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a
file and | 112 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a
file and |
| 104 // send it to isolated server. That way isolate server can verif
y that | 113 // send it to isolated server. That way isolate server can verif
y that |
| 105 // the data safely reached Google Storage (GS provides MD5 and C
RC32C of | 114 // the data safely reached Google Storage (GS provides MD5 and C
RC32C of |
| 106 // stored files). | 115 // stored files). |
| 107 in := isolated.FinalizeRequest{state.status.UploadTicket} | 116 in := isolated.FinalizeRequest{state.status.UploadTicket} |
| 108 » » url := i.url + "/_ah/api/isolateservice/v1/finalize_gs_upload" | 117 » » if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo
ad", in, nil); err != nil { |
| 109 » » _, err = common.PostJSON(nil, url, in, nil) | |
| 110 » » if err != nil { | |
| 111 return | 118 return |
| 112 } | 119 } |
| 113 } | 120 } |
| 114 state.finalized = true | 121 state.finalized = true |
| 115 return | 122 return |
| 116 } | 123 } |
| 117 | 124 |
| 118 func (i *isolateServer) doPush(state *PushState, src io.Reader) (err error) { | 125 func (i *isolateServer) doPush(state *PushState, src io.Reader) (err error) { |
| 119 end := tracer.Span(i, "push", strconv.FormatInt(state.size, 10), nil) | 126 end := tracer.Span(i, "push", strconv.FormatInt(state.size, 10), nil) |
| 120 defer func() { end(tracer.Args{"err": err}) }() | 127 defer func() { end(tracer.Args{"err": err}) }() |
| (...skipping 11 matching lines...) Expand all Loading... |
| 132 }() | 139 }() |
| 133 defer func() { | 140 defer func() { |
| 134 err4 := <-c | 141 err4 := <-c |
| 135 if err == nil { | 142 if err == nil { |
| 136 err = err4 | 143 err = err4 |
| 137 } | 144 } |
| 138 }() | 145 }() |
| 139 | 146 |
| 140 // DB upload. | 147 // DB upload. |
| 141 if state.status.GSUploadURL == "" { | 148 if state.status.GSUploadURL == "" { |
| 142 url := i.url + "/_ah/api/isolateservice/v1/store_inline" | |
| 143 content, err2 := ioutil.ReadAll(reader) | 149 content, err2 := ioutil.ReadAll(reader) |
| 144 if err2 != nil { | 150 if err2 != nil { |
| 145 return err2 | 151 return err2 |
| 146 } | 152 } |
| 147 in := &isolated.StorageRequest{state.status.UploadTicket, conten
t} | 153 in := &isolated.StorageRequest{state.status.UploadTicket, conten
t} |
| 148 » » _, err = common.PostJSON(nil, url, in, nil) | 154 » » return i.postJSON("/_ah/api/isolateservice/v1/store_inline", in,
nil) |
| 149 » » return | |
| 150 } | 155 } |
| 151 | 156 |
| 152 // Upload to GCS. | 157 // Upload to GCS. |
| 153 client := &http.Client{} | 158 client := &http.Client{} |
| 154 request, err5 := http.NewRequest("PUT", state.status.GSUploadURL, reader
) | 159 request, err5 := http.NewRequest("PUT", state.status.GSUploadURL, reader
) |
| 155 if err5 != nil { | 160 if err5 != nil { |
| 156 return err5 | 161 return err5 |
| 157 } | 162 } |
| 158 request.Header.Set("Content-Type", "application/octet-stream") | 163 request.Header.Set("Content-Type", "application/octet-stream") |
| 159 // TODO(maruel): For relatively small file, set request.ContentLength so
the | 164 // TODO(maruel): For relatively small file, set request.ContentLength so
the |
| 160 // TCP connection can be reused. | 165 // TCP connection can be reused. |
| 161 resp, err6 := client.Do(request) | 166 resp, err6 := client.Do(request) |
| 162 if err6 != nil { | 167 if err6 != nil { |
| 163 return err6 | 168 return err6 |
| 164 } | 169 } |
| 165 _, err = io.Copy(ioutil.Discard, resp.Body) | 170 _, err = io.Copy(ioutil.Discard, resp.Body) |
| 166 resp.Body.Close() | 171 resp.Body.Close() |
| 167 return | 172 return |
| 168 } | 173 } |
| OLD | NEW |