Chromium Code Reviews| Index: go/src/infra/tools/cipd/remote.go |
| diff --git a/go/src/infra/tools/cipd/remote.go b/go/src/infra/tools/cipd/remote.go |
| index 627fbba0def1f45319ce5fa4c6f640642e99378a..ef46e2a1efb08b6e03d3bbe27eb07886800e7d4d 100644 |
| --- a/go/src/infra/tools/cipd/remote.go |
| +++ b/go/src/infra/tools/cipd/remote.go |
| @@ -16,23 +16,12 @@ import ( |
| "strconv" |
| "time" |
| - "infra/libs/logging" |
| + "infra/tools/cipd/common" |
| ) |
| -// remoteService is a wrapper around Cloud Endpoints APIs exposed by backend. |
| -// See //appengine/chrome_infra_packages. |
| -type remoteService struct { |
| - client *http.Client |
| - serviceURL string |
| - log logging.Logger |
| -} |
| - |
| -type uploadSession struct { |
| - ID string |
| - URL string |
| -} |
| +// remoteMaxRetries is how many times to retry transient HTTP errors. |
| +const remoteMaxRetries = 10 |
| -// packageInstanceMsg corresponds to PackageInstance message on the backend. |
| type packageInstanceMsg struct { |
| PackageName string `json:"package_name"` |
| InstanceID string `json:"instance_id"` |
| @@ -40,26 +29,6 @@ type packageInstanceMsg struct { |
| RegisteredTs string `json:"registered_ts"` |
| } |
| -// packageInstance is almost like packageInstanceMsg, but with timestamp as |
| -// time.Time. See also makePackageInstanceInfo. |
| -type packageInstanceInfo struct { |
| - PackageName string |
| - InstanceID string |
| - RegisteredBy string |
| - RegisteredTs time.Time |
| -} |
| - |
| -type registerInstanceResponse struct { |
| - UploadSession *uploadSession |
| - AlreadyRegistered bool |
| - Info packageInstanceInfo |
| -} |
| - |
| -type fetchInstanceResponse struct { |
| - FetchURL string |
| - Info packageInstanceInfo |
| -} |
| - |
| // roleChangeMsg corresponds to RoleChange proto message on backend. |
| type roleChangeMsg struct { |
| Action string `json:"action"` |
| @@ -77,32 +46,36 @@ func (e *pendingProcessingError) Error() string { |
| return e.message |
| } |
| -// newRemoteService is mocked in tests. |
| -var newRemoteService = func(client *http.Client, url string, log logging.Logger) *remoteService { |
| - log.Infof("cipd: service URL is %s", url) |
| - return &remoteService{ |
| - client: client, |
| - serviceURL: url, |
| - log: log, |
| - } |
| +// remoteImpl implements remote on top of real HTTP calls. |
| +type remoteImpl struct { |
| + client *Client |
| +} |
| + |
| +func isTemporaryNetError(err error) bool { |
| + // TODO(vadimsh): Figure out how to recognize dial timeouts, read timeouts, |
| + // etc. For now all errors that end up here are considered retryable. |
|
nodir
2015/05/13 01:25:29
english: retriable
Vadim Sh.
2015/05/13 03:08:06
Changed to "temporary", retriable is not a word, I
nodir
2015/05/13 15:54:25
http://en.wiktionary.org/wiki/retriable
|
| + return true |
| } |
| -// makeRequest sends POST or GET request with retries. |
| -func (r *remoteService) makeRequest(path, method string, request, response interface{}) error { |
| +// makeRequest sends POST or GET REST JSON requests with retries. |
| +func (r *remoteImpl) makeRequest(path, method string, request, response interface{}) error { |
| var body []byte |
| - var err error |
| if request != nil { |
| - body, err = json.Marshal(request) |
| + b, err := json.Marshal(request) |
| if err != nil { |
| return err |
| } |
| + body = b |
| } |
| - url := fmt.Sprintf("%s/_ah/api/%s", r.serviceURL, path) |
| - for attempt := 0; attempt < 10; attempt++ { |
| + |
| + url := fmt.Sprintf("%s/_ah/api/%s", r.client.ServiceURL, path) |
| + for attempt := 0; attempt < remoteMaxRetries; attempt++ { |
| if attempt != 0 { |
| - r.log.Warningf("cipd: retrying request to %s", url) |
| - clock.Sleep(2 * time.Second) |
| + r.client.Log.Warningf("cipd: retrying request to %s", url) |
| + r.client.clock.sleep(2 * time.Second) |
| } |
| + |
| + // Prepare request. |
| var bodyReader io.Reader |
| if body != nil { |
| bodyReader = bytes.NewReader(body) |
| @@ -114,29 +87,47 @@ func (r *remoteService) makeRequest(path, method string, request, response inter |
| if body != nil { |
| req.Header.Set("Content-Type", "application/json") |
| } |
| - req.Header.Set("User-Agent", userAgent()) |
| - resp, err := r.client.Do(req) |
| + req.Header.Set("User-Agent", r.client.UserAgent) |
| + |
| + // Connect, read response. |
| + r.client.Log.Debugf("cipd: %s %s", method, url) |
| + resp, err := r.client.doAuthenticatedHTTPRequest(req) |
| if err != nil { |
| + if isTemporaryNetError(err) { |
| + r.client.Log.Warningf("cipd: connectivity error (%s)", err) |
| + continue |
| + } |
| return err |
| } |
| + responseBody, err := ioutil.ReadAll(resp.Body) |
| + resp.Body.Close() |
| + if err != nil { |
| + if isTemporaryNetError(err) { |
| + r.client.Log.Warningf("cipd: temporary error when reading response (%s)", err) |
| + continue |
| + } |
| + return err |
| + } |
| + r.client.Log.Debugf("cipd: http %d: %s", resp.StatusCode, body) |
| + |
| // Success? |
| if resp.StatusCode < 300 { |
| - defer resp.Body.Close() |
| - return json.NewDecoder(resp.Body).Decode(response) |
| + return json.Unmarshal(responseBody, response) |
| } |
| + |
| // Fatal error? |
| - if resp.StatusCode >= 300 && resp.StatusCode < 500 { |
| - defer resp.Body.Close() |
| - body, _ := ioutil.ReadAll(resp.Body) |
| + if resp.StatusCode >= 300 && resp.StatusCode < 500 && resp.StatusCode != 408 { |
| + if resp.StatusCode == 403 || resp.StatusCode == 401 { |
|
nodir
2015/05/13 01:25:29
move this if out of the enclosing if
Vadim Sh.
2015/05/13 03:08:06
Done.
|
| + return ErrAccessDenined |
| + } |
| return fmt.Errorf("Unexpected reply (HTTP %d):\n%s", resp.StatusCode, string(body)) |
| } |
| - // Retry. |
| - resp.Body.Close() |
| } |
| - return fmt.Errorf("Request to %s failed after 10 attempts", url) |
| + |
| + return ErrBackendInaccessible |
| } |
| -func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error) { |
| +func (r *remoteImpl) initiateUpload(sha1 string) (s *UploadSession, err error) { |
| var reply struct { |
| Status string `json:"status"` |
| UploadSessionID string `json:"upload_session_id"` |
| @@ -151,10 +142,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error |
| case "ALREADY_UPLOADED": |
| return |
| case "SUCCESS": |
| - s = &uploadSession{ |
| - ID: reply.UploadSessionID, |
| - URL: reply.UploadURL, |
| - } |
| + s = &UploadSession{reply.UploadSessionID, reply.UploadURL} |
| case "ERROR": |
| err = fmt.Errorf("Server replied with error: %s", reply.ErrorMessage) |
| default: |
| @@ -163,7 +151,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error |
| return |
| } |
| -func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err error) { |
| +func (r *remoteImpl) finalizeUpload(sessionID string) (finished bool, err error) { |
| var reply struct { |
| Status string `json:"status"` |
| ErrorMessage string `json:"error_message"` |
| @@ -174,7 +162,7 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err |
| } |
| switch reply.Status { |
| case "MISSING": |
| - err = errors.New("Upload session is unexpectedly missing") |
| + err = ErrUploadSessionDied |
| case "UPLOADING", "VERIFYING": |
| finished = false |
| case "PUBLISHED": |
| @@ -187,8 +175,8 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err |
| return |
| } |
| -func (r *remoteService) registerInstance(packageName, instanceID string) (*registerInstanceResponse, error) { |
| - endpoint, err := instanceEndpoint(packageName, instanceID) |
| +func (r *remoteImpl) registerInstance(pin common.Pin) (*registerInstanceResponse, error) { |
| + endpoint, err := instanceEndpoint(pin) |
| if err != nil { |
| return nil, err |
| } |
| @@ -205,23 +193,21 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis |
| } |
| switch reply.Status { |
| case "REGISTERED", "ALREADY_REGISTERED": |
| - info, err := makePackageInstanceInfo(reply.Instance) |
| + ts, err := convertTimestamp(reply.Instance.RegisteredTs) |
| if err != nil { |
| return nil, err |
| } |
| return ®isterInstanceResponse{ |
| - AlreadyRegistered: reply.Status == "ALREADY_REGISTERED", |
| - Info: info, |
| + alreadyRegistered: reply.Status == "ALREADY_REGISTERED", |
| + registeredBy: reply.Instance.RegisteredBy, |
| + registeredTs: ts, |
| }, nil |
| case "UPLOAD_FIRST": |
| if reply.UploadSessionID == "" { |
| - return nil, errors.New("Server didn't provide upload session ID") |
| + return nil, ErrNoUploadSessionID |
| } |
| return ®isterInstanceResponse{ |
| - UploadSession: &uploadSession{ |
| - ID: reply.UploadSessionID, |
| - URL: reply.UploadURL, |
| - }, |
| + uploadSession: &UploadSession{reply.UploadSessionID, reply.UploadURL}, |
| }, nil |
| case "ERROR": |
| return nil, errors.New(reply.ErrorMessage) |
| @@ -229,8 +215,8 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis |
| return nil, fmt.Errorf("Unexpected register package status: %s", reply.Status) |
| } |
| -func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchInstanceResponse, error) { |
| - endpoint, err := instanceEndpoint(packageName, instanceID) |
| +func (r *remoteImpl) fetchInstance(pin common.Pin) (*fetchInstanceResponse, error) { |
| + endpoint, err := instanceEndpoint(pin) |
| if err != nil { |
| return nil, err |
| } |
| @@ -246,25 +232,26 @@ func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchIns |
| } |
| switch reply.Status { |
| case "SUCCESS": |
| - info, err := makePackageInstanceInfo(reply.Instance) |
| + ts, err := convertTimestamp(reply.Instance.RegisteredTs) |
| if err != nil { |
| return nil, err |
| } |
| return &fetchInstanceResponse{ |
| - FetchURL: reply.FetchURL, |
| - Info: info, |
| + fetchURL: reply.FetchURL, |
| + registeredBy: reply.Instance.RegisteredBy, |
| + registeredTs: ts, |
| }, nil |
| case "PACKAGE_NOT_FOUND": |
| - return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", packageName) |
| + return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", pin.PackageName) |
| case "INSTANCE_NOT_FOUND": |
| - return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", packageName, instanceID) |
| + return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", pin.PackageName, pin.InstanceID) |
| case "ERROR": |
| return nil, errors.New(reply.ErrorMessage) |
| } |
| return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status) |
| } |
| -func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) { |
| +func (r *remoteImpl) fetchACL(packagePath string) ([]PackageACL, error) { |
| endpoint, err := aclEndpoint(packagePath) |
| if err != nil { |
| return nil, err |
| @@ -309,7 +296,7 @@ func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) { |
| return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status) |
| } |
| -func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange) error { |
| +func (r *remoteImpl) modifyACL(packagePath string, changes []PackageACLChange) error { |
| endpoint, err := aclEndpoint(packagePath) |
| if err != nil { |
| return err |
| @@ -349,22 +336,19 @@ func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange |
| return fmt.Errorf("Unexpected reply status: %s", reply.Status) |
| } |
| -func (r *remoteService) attachTags(packageName, instanceID string, tags []string) error { |
| +func (r *remoteImpl) attachTags(pin common.Pin, tags []string) error { |
| // Tags will be passed in the request body, not via URL. |
| - endpoint, err := tagsEndpoint(packageName, instanceID, nil) |
| + endpoint, err := tagsEndpoint(pin, nil) |
| if err != nil { |
| return err |
| } |
| - |
| - if len(tags) == 0 { |
| - return errors.New("At least one tag must be provided") |
| - } |
| for _, tag := range tags { |
| - err = ValidateInstanceTag(tag) |
| + err = common.ValidateInstanceTag(tag) |
| if err != nil { |
| return err |
| } |
| } |
| + |
| var request struct { |
| Tags []string `json:"tags"` |
| } |
| @@ -391,23 +375,19 @@ func (r *remoteService) attachTags(packageName, instanceID string, tags []string |
| //////////////////////////////////////////////////////////////////////////////// |
| -func instanceEndpoint(packageName, instanceID string) (string, error) { |
| - err := ValidatePackageName(packageName) |
| - if err != nil { |
| - return "", err |
| - } |
| - err = ValidateInstanceID(instanceID) |
| +func instanceEndpoint(pin common.Pin) (string, error) { |
| + err := common.ValidatePin(pin) |
| if err != nil { |
| return "", err |
| } |
| params := url.Values{} |
| - params.Add("package_name", packageName) |
| - params.Add("instance_id", instanceID) |
| + params.Add("package_name", pin.PackageName) |
| + params.Add("instance_id", pin.InstanceID) |
| return "repo/v1/instance?" + params.Encode(), nil |
| } |
| func aclEndpoint(packagePath string) (string, error) { |
| - err := ValidatePackageName(packagePath) |
| + err := common.ValidatePackageName(packagePath) |
| if err != nil { |
| return "", err |
| } |
| @@ -416,24 +396,20 @@ func aclEndpoint(packagePath string) (string, error) { |
| return "repo/v1/acl?" + params.Encode(), nil |
| } |
| -func tagsEndpoint(packageName, instanceID string, tags []string) (string, error) { |
| - err := ValidatePackageName(packageName) |
| - if err != nil { |
| - return "", err |
| - } |
| - err = ValidateInstanceID(instanceID) |
| +func tagsEndpoint(pin common.Pin, tags []string) (string, error) { |
| + err := common.ValidatePin(pin) |
| if err != nil { |
| return "", err |
| } |
| for _, tag := range tags { |
| - err = ValidateInstanceTag(tag) |
| + err = common.ValidateInstanceTag(tag) |
| if err != nil { |
| return "", err |
| } |
| } |
| params := url.Values{} |
| - params.Add("package_name", packageName) |
| - params.Add("instance_id", instanceID) |
| + params.Add("package_name", pin.PackageName) |
| + params.Add("instance_id", pin.InstanceID) |
| for _, tag := range tags { |
| params.Add("tag", tag) |
| } |
| @@ -449,17 +425,3 @@ func convertTimestamp(ts string) (time.Time, error) { |
| } |
| return time.Unix(0, i*1000), nil |
| } |
| - |
| -func makePackageInstanceInfo(msg packageInstanceMsg) (pi packageInstanceInfo, err error) { |
| - ts, err := convertTimestamp(msg.RegisteredTs) |
| - if err != nil { |
| - return |
| - } |
| - pi = packageInstanceInfo{ |
| - PackageName: msg.PackageName, |
| - InstanceID: msg.InstanceID, |
| - RegisteredBy: msg.RegisteredBy, |
| - RegisteredTs: ts, |
| - } |
| - return |
| -} |