| OLD | NEW | 
|    1 // Copyright 2015 The Chromium Authors. All rights reserved. |    1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
|    2 // Use of this source code is governed by a BSD-style license that can be |    2 // Use of this source code is governed by a BSD-style license that can be | 
|    3 // found in the LICENSE file. |    3 // found in the LICENSE file. | 
|    4  |    4  | 
|    5 package isolatedclient |    5 package isolatedclient | 
|    6  |    6  | 
|    7 import ( |    7 import ( | 
 |    8         "errors" | 
|    8         "io" |    9         "io" | 
|    9         "io/ioutil" |   10         "io/ioutil" | 
|   10         "net/http" |   11         "net/http" | 
|   11         "strconv" |   12         "strconv" | 
 |   13         "strings" | 
|   12  |   14  | 
|   13 »       "github.com/luci/luci-go/client/internal/common" |   15 »       "github.com/luci/luci-go/client/internal/lhttp" | 
 |   16 »       "github.com/luci/luci-go/client/internal/retry" | 
|   14         "github.com/luci/luci-go/client/internal/tracer" |   17         "github.com/luci/luci-go/client/internal/tracer" | 
|   15         "github.com/luci/luci-go/common/isolated" |   18         "github.com/luci/luci-go/common/isolated" | 
|   16 ) |   19 ) | 
|   17  |   20  | 
|   18 // IsolateServer is the low-level client interface to interact with an Isolate |   21 // IsolateServer is the low-level client interface to interact with an Isolate | 
|   19 // server. |   22 // server. | 
|   20 type IsolateServer interface { |   23 type IsolateServer interface { | 
|   21         ServerCapabilities() (*isolated.ServerCapabilities, error) |   24         ServerCapabilities() (*isolated.ServerCapabilities, error) | 
|   22         // Contains looks up cache presence on the server of multiple items. |   25         // Contains looks up cache presence on the server of multiple items. | 
|   23         // |   26         // | 
|   24         // The returned list is in the same order as 'items', with entries nil f
     or |   27         // The returned list is in the same order as 'items', with entries nil f
     or | 
|   25         // items that were present. |   28         // items that were present. | 
|   26         Contains(items []*isolated.DigestItem) ([]*PushState, error) |   29         Contains(items []*isolated.DigestItem) ([]*PushState, error) | 
|   27         Push(state *PushState, src io.Reader) error |   30         Push(state *PushState, src io.Reader) error | 
|   28 } |   31 } | 
|   29  |   32  | 
|   30 // PushState is per-item state passed from IsolateServer.Contains() to |   33 // PushState is per-item state passed from IsolateServer.Contains() to | 
|   31 // IsolateServer.Push(). |   34 // IsolateServer.Push(). | 
|   32 // |   35 // | 
|   33 // Its content is implementation specific. |   36 // Its content is implementation specific. | 
|   34 type PushState struct { |   37 type PushState struct { | 
|   35         status    isolated.PreuploadStatus |   38         status    isolated.PreuploadStatus | 
|   36         size      int64 |   39         size      int64 | 
|   37         uploaded  bool |   40         uploaded  bool | 
|   38         finalized bool |   41         finalized bool | 
|   39 } |   42 } | 
|   40  |   43  | 
|   41 // New returns a new IsolateServer client. |   44 // New returns a new IsolateServer client. | 
|   42 func New(url, namespace string) IsolateServer { |   45 func New(host, namespace string) IsolateServer { | 
|   43         i := &isolateServer{ |   46         i := &isolateServer{ | 
|   44 »       »       url:       url, |   47 »       »       url:       strings.TrimRight(host, "/"), | 
|   45                 namespace: namespace, |   48                 namespace: namespace, | 
|   46         } |   49         } | 
|   47 »       tracer.NewTID(i, nil, url) |   50 »       tracer.NewTID(i, nil, i.url) | 
|   48         return i |   51         return i | 
|   49 } |   52 } | 
|   50  |   53  | 
|   51 // Private details. |   54 // Private details. | 
|   52  |   55  | 
|   53 type isolateServer struct { |   56 type isolateServer struct { | 
|   54         url       string |   57         url       string | 
|   55         namespace string |   58         namespace string | 
|   56 } |   59 } | 
|   57  |   60  | 
 |   61 func (i *isolateServer) postJSON(resource string, in, out interface{}) error { | 
 |   62         if len(resource) == 0 || resource[0] != '/' { | 
 |   63                 return errors.New("resource must start with '/'") | 
 |   64         } | 
 |   65         _, err := lhttp.PostJSON(retry.Default, http.DefaultClient, i.url+resour
     ce, in, out) | 
 |   66         return err | 
 |   67 } | 
 |   68  | 
|   58 func (i *isolateServer) ServerCapabilities() (*isolated.ServerCapabilities, erro
     r) { |   69 func (i *isolateServer) ServerCapabilities() (*isolated.ServerCapabilities, erro
     r) { | 
|   59         url := i.url + "/_ah/api/isolateservice/v1/server_details" |  | 
|   60         out := &isolated.ServerCapabilities{} |   70         out := &isolated.ServerCapabilities{} | 
|   61 »       if _, err := common.PostJSON(nil, url, nil, out); err != nil { |   71 »       if err := i.postJSON("/_ah/api/isolateservice/v1/server_details", map[st
     ring]string{}, out); err != nil { | 
|   62                 return nil, err |   72                 return nil, err | 
|   63         } |   73         } | 
|   64         return out, nil |   74         return out, nil | 
|   65 } |   75 } | 
|   66  |   76  | 
|   67 func (i *isolateServer) Contains(items []*isolated.DigestItem) (out []*PushState
     , err error) { |   77 func (i *isolateServer) Contains(items []*isolated.DigestItem) (out []*PushState
     , err error) { | 
|   68         end := tracer.Span(i, "contains", strconv.Itoa(len(items)), nil) |   78         end := tracer.Span(i, "contains", strconv.Itoa(len(items)), nil) | 
|   69         defer func() { end(tracer.Args{"err": err}) }() |   79         defer func() { end(tracer.Args{"err": err}) }() | 
|   70         in := isolated.DigestCollection{Items: items} |   80         in := isolated.DigestCollection{Items: items} | 
|   71         in.Namespace.Namespace = i.namespace |   81         in.Namespace.Namespace = i.namespace | 
|   72         data := &isolated.UrlCollection{} |   82         data := &isolated.UrlCollection{} | 
|   73 »       url := i.url + "/_ah/api/isolateservice/v1/preupload" |   83 »       if err = i.postJSON("/_ah/api/isolateservice/v1/preupload", in, data); e
     rr != nil { | 
|   74 »       if _, err = common.PostJSON(nil, url, in, data); err != nil { |  | 
|   75                 return nil, err |   84                 return nil, err | 
|   76         } |   85         } | 
|   77         out = make([]*PushState, len(items)) |   86         out = make([]*PushState, len(items)) | 
|   78         for _, e := range data.Items { |   87         for _, e := range data.Items { | 
|   79                 index := int(e.Index) |   88                 index := int(e.Index) | 
|   80                 out[index] = &PushState{ |   89                 out[index] = &PushState{ | 
|   81                         status: e, |   90                         status: e, | 
|   82                         size:   items[index].Size, |   91                         size:   items[index].Size, | 
|   83                 } |   92                 } | 
|   84         } |   93         } | 
| (...skipping 13 matching lines...) Expand all  Loading... | 
|   98  |  107  | 
|   99         // Optionally notify the server that it's done. |  108         // Optionally notify the server that it's done. | 
|  100         if state.status.GSUploadURL != "" { |  109         if state.status.GSUploadURL != "" { | 
|  101                 end := tracer.Span(i, "finalize", "finalize", nil) |  110                 end := tracer.Span(i, "finalize", "finalize", nil) | 
|  102                 defer func() { end(tracer.Args{"err": err}) }() |  111                 defer func() { end(tracer.Args{"err": err}) }() | 
|  103                 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a 
     file and |  112                 // TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a 
     file and | 
|  104                 // send it to isolated server. That way isolate server can verif
     y that |  113                 // send it to isolated server. That way isolate server can verif
     y that | 
|  105                 // the data safely reached Google Storage (GS provides MD5 and C
     RC32C of |  114                 // the data safely reached Google Storage (GS provides MD5 and C
     RC32C of | 
|  106                 // stored files). |  115                 // stored files). | 
|  107                 in := isolated.FinalizeRequest{state.status.UploadTicket} |  116                 in := isolated.FinalizeRequest{state.status.UploadTicket} | 
|  108 »       »       url := i.url + "/_ah/api/isolateservice/v1/finalize_gs_upload" |  117 »       »       if err = i.postJSON("/_ah/api/isolateservice/v1/finalize_gs_uplo
     ad", in, nil); err != nil { | 
|  109 »       »       _, err = common.PostJSON(nil, url, in, nil) |  | 
|  110 »       »       if err != nil { |  | 
|  111                         return |  118                         return | 
|  112                 } |  119                 } | 
|  113         } |  120         } | 
|  114         state.finalized = true |  121         state.finalized = true | 
|  115         return |  122         return | 
|  116 } |  123 } | 
|  117  |  124  | 
|  118 func (i *isolateServer) doPush(state *PushState, src io.Reader) (err error) { |  125 func (i *isolateServer) doPush(state *PushState, src io.Reader) (err error) { | 
|  119         end := tracer.Span(i, "push", strconv.FormatInt(state.size, 10), nil) |  126         end := tracer.Span(i, "push", strconv.FormatInt(state.size, 10), nil) | 
|  120         defer func() { end(tracer.Args{"err": err}) }() |  127         defer func() { end(tracer.Args{"err": err}) }() | 
| (...skipping 11 matching lines...) Expand all  Loading... | 
|  132         }() |  139         }() | 
|  133         defer func() { |  140         defer func() { | 
|  134                 err4 := <-c |  141                 err4 := <-c | 
|  135                 if err == nil { |  142                 if err == nil { | 
|  136                         err = err4 |  143                         err = err4 | 
|  137                 } |  144                 } | 
|  138         }() |  145         }() | 
|  139  |  146  | 
|  140         // DB upload. |  147         // DB upload. | 
|  141         if state.status.GSUploadURL == "" { |  148         if state.status.GSUploadURL == "" { | 
|  142                 url := i.url + "/_ah/api/isolateservice/v1/store_inline" |  | 
|  143                 content, err2 := ioutil.ReadAll(reader) |  149                 content, err2 := ioutil.ReadAll(reader) | 
|  144                 if err2 != nil { |  150                 if err2 != nil { | 
|  145                         return err2 |  151                         return err2 | 
|  146                 } |  152                 } | 
|  147                 in := &isolated.StorageRequest{state.status.UploadTicket, conten
     t} |  153                 in := &isolated.StorageRequest{state.status.UploadTicket, conten
     t} | 
|  148 »       »       _, err = common.PostJSON(nil, url, in, nil) |  154 »       »       return i.postJSON("/_ah/api/isolateservice/v1/store_inline", in,
      nil) | 
|  149 »       »       return |  | 
|  150         } |  155         } | 
|  151  |  156  | 
|  152         // Upload to GCS. |  157         // Upload to GCS. | 
|  153         client := &http.Client{} |  158         client := &http.Client{} | 
|  154         request, err5 := http.NewRequest("PUT", state.status.GSUploadURL, reader
     ) |  159         request, err5 := http.NewRequest("PUT", state.status.GSUploadURL, reader
     ) | 
|  155         if err5 != nil { |  160         if err5 != nil { | 
|  156                 return err5 |  161                 return err5 | 
|  157         } |  162         } | 
|  158         request.Header.Set("Content-Type", "application/octet-stream") |  163         request.Header.Set("Content-Type", "application/octet-stream") | 
|  159         // TODO(maruel): For relatively small file, set request.ContentLength so
      the |  164         // TODO(maruel): For relatively small file, set request.ContentLength so
      the | 
|  160         // TCP connection can be reused. |  165         // TCP connection can be reused. | 
|  161         resp, err6 := client.Do(request) |  166         resp, err6 := client.Do(request) | 
|  162         if err6 != nil { |  167         if err6 != nil { | 
|  163                 return err6 |  168                 return err6 | 
|  164         } |  169         } | 
|  165         _, err = io.Copy(ioutil.Discard, resp.Body) |  170         _, err = io.Copy(ioutil.Discard, resp.Body) | 
|  166         resp.Body.Close() |  171         resp.Body.Close() | 
|  167         return |  172         return | 
|  168 } |  173 } | 
| OLD | NEW |