Index: go/src/infra/tools/cipd/remote.go |
diff --git a/go/src/infra/tools/cipd/remote.go b/go/src/infra/tools/cipd/remote.go |
index 627fbba0def1f45319ce5fa4c6f640642e99378a..7bb25f8e8f62f5a883967f4344721fe613e1f786 100644 |
--- a/go/src/infra/tools/cipd/remote.go |
+++ b/go/src/infra/tools/cipd/remote.go |
@@ -16,23 +16,12 @@ import ( |
"strconv" |
"time" |
- "infra/libs/logging" |
+ "infra/tools/cipd/common" |
) |
-// remoteService is a wrapper around Cloud Endpoints APIs exposed by backend. |
-// See //appengine/chrome_infra_packages. |
-type remoteService struct { |
- client *http.Client |
- serviceURL string |
- log logging.Logger |
-} |
- |
-type uploadSession struct { |
- ID string |
- URL string |
-} |
+// remoteMaxRetries is how many times to retry transient HTTP errors. |
+const remoteMaxRetries = 10 |
-// packageInstanceMsg corresponds to PackageInstance message on the backend. |
type packageInstanceMsg struct { |
PackageName string `json:"package_name"` |
InstanceID string `json:"instance_id"` |
@@ -40,26 +29,6 @@ type packageInstanceMsg struct { |
RegisteredTs string `json:"registered_ts"` |
} |
-// packageInstance is almost like packageInstanceMsg, but with timestamp as |
-// time.Time. See also makePackageInstanceInfo. |
-type packageInstanceInfo struct { |
- PackageName string |
- InstanceID string |
- RegisteredBy string |
- RegisteredTs time.Time |
-} |
- |
-type registerInstanceResponse struct { |
- UploadSession *uploadSession |
- AlreadyRegistered bool |
- Info packageInstanceInfo |
-} |
- |
-type fetchInstanceResponse struct { |
- FetchURL string |
- Info packageInstanceInfo |
-} |
- |
// roleChangeMsg corresponds to RoleChange proto message on backend. |
type roleChangeMsg struct { |
Action string `json:"action"` |
@@ -77,32 +46,42 @@ func (e *pendingProcessingError) Error() string { |
return e.message |
} |
-// newRemoteService is mocked in tests. |
-var newRemoteService = func(client *http.Client, url string, log logging.Logger) *remoteService { |
- log.Infof("cipd: service URL is %s", url) |
- return &remoteService{ |
- client: client, |
- serviceURL: url, |
- log: log, |
- } |
+// remoteImpl implements remote on top of real HTTP calls. |
+type remoteImpl struct { |
+ client *Client |
+} |
+ |
+func isTemporaryNetError(err error) bool { |
+ // TODO(vadimsh): Figure out how to recognize dial timeouts, read timeouts, |
+ // etc. For now all errors that end up here are considered temporary. |
+ return true |
} |
-// makeRequest sends POST or GET request with retries. |
-func (r *remoteService) makeRequest(path, method string, request, response interface{}) error { |
+// isTemporaryHTTPError returns true for HTTP status codes that indicate |
+// a temporary error that may go away if request is retried. |
+func isTemporaryHTTPError(statusCode int) bool { |
+ return statusCode >= 500 || statusCode == 408 || statusCode == 429 |
+} |
+ |
+// makeRequest sends POST or GET REST JSON requests with retries. |
+func (r *remoteImpl) makeRequest(path, method string, request, response interface{}) error { |
var body []byte |
- var err error |
if request != nil { |
- body, err = json.Marshal(request) |
+ b, err := json.Marshal(request) |
if err != nil { |
return err |
} |
+ body = b |
} |
- url := fmt.Sprintf("%s/_ah/api/%s", r.serviceURL, path) |
- for attempt := 0; attempt < 10; attempt++ { |
+ |
+ url := fmt.Sprintf("%s/_ah/api/%s", r.client.ServiceURL, path) |
+ for attempt := 0; attempt < remoteMaxRetries; attempt++ { |
if attempt != 0 { |
- r.log.Warningf("cipd: retrying request to %s", url) |
- clock.Sleep(2 * time.Second) |
+ r.client.Log.Warningf("cipd: retrying request to %s", url) |
+ r.client.clock.sleep(2 * time.Second) |
} |
+ |
+ // Prepare request. |
var bodyReader io.Reader |
if body != nil { |
bodyReader = bytes.NewReader(body) |
@@ -114,29 +93,48 @@ func (r *remoteService) makeRequest(path, method string, request, response inter |
if body != nil { |
req.Header.Set("Content-Type", "application/json") |
} |
- req.Header.Set("User-Agent", userAgent()) |
- resp, err := r.client.Do(req) |
+ req.Header.Set("User-Agent", r.client.UserAgent) |
+ |
+ // Connect, read response. |
+ r.client.Log.Debugf("cipd: %s %s", method, url) |
+ resp, err := r.client.doAuthenticatedHTTPRequest(req) |
if err != nil { |
+ if isTemporaryNetError(err) { |
+ r.client.Log.Warningf("cipd: connectivity error (%s)", err) |
+ continue |
+ } |
+ return err |
+ } |
+ responseBody, err := ioutil.ReadAll(resp.Body) |
+ resp.Body.Close() |
+ if err != nil { |
+ if isTemporaryNetError(err) { |
+ r.client.Log.Warningf("cipd: temporary error when reading response (%s)", err) |
+ continue |
+ } |
return err |
} |
+ r.client.Log.Debugf("cipd: http %d: %s", resp.StatusCode, body) |
+ if isTemporaryHTTPError(resp.StatusCode) { |
+ continue |
+ } |
+ |
// Success? |
if resp.StatusCode < 300 { |
- defer resp.Body.Close() |
- return json.NewDecoder(resp.Body).Decode(response) |
+ return json.Unmarshal(responseBody, response) |
} |
+ |
// Fatal error? |
- if resp.StatusCode >= 300 && resp.StatusCode < 500 { |
- defer resp.Body.Close() |
- body, _ := ioutil.ReadAll(resp.Body) |
- return fmt.Errorf("Unexpected reply (HTTP %d):\n%s", resp.StatusCode, string(body)) |
+ if resp.StatusCode == 403 || resp.StatusCode == 401 { |
+ return ErrAccessDenined |
} |
- // Retry. |
- resp.Body.Close() |
+ return fmt.Errorf("Unexpected reply (HTTP %d):\n%s", resp.StatusCode, string(body)) |
} |
- return fmt.Errorf("Request to %s failed after 10 attempts", url) |
+ |
+ return ErrBackendInaccessible |
} |
-func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error) { |
+func (r *remoteImpl) initiateUpload(sha1 string) (s *UploadSession, err error) { |
var reply struct { |
Status string `json:"status"` |
UploadSessionID string `json:"upload_session_id"` |
@@ -151,10 +149,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error |
case "ALREADY_UPLOADED": |
return |
case "SUCCESS": |
- s = &uploadSession{ |
- ID: reply.UploadSessionID, |
- URL: reply.UploadURL, |
- } |
+ s = &UploadSession{reply.UploadSessionID, reply.UploadURL} |
case "ERROR": |
err = fmt.Errorf("Server replied with error: %s", reply.ErrorMessage) |
default: |
@@ -163,7 +158,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error |
return |
} |
-func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err error) { |
+func (r *remoteImpl) finalizeUpload(sessionID string) (finished bool, err error) { |
var reply struct { |
Status string `json:"status"` |
ErrorMessage string `json:"error_message"` |
@@ -174,7 +169,7 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err |
} |
switch reply.Status { |
case "MISSING": |
- err = errors.New("Upload session is unexpectedly missing") |
+ err = ErrUploadSessionDied |
case "UPLOADING", "VERIFYING": |
finished = false |
case "PUBLISHED": |
@@ -187,8 +182,8 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err |
return |
} |
-func (r *remoteService) registerInstance(packageName, instanceID string) (*registerInstanceResponse, error) { |
- endpoint, err := instanceEndpoint(packageName, instanceID) |
+func (r *remoteImpl) registerInstance(pin common.Pin) (*registerInstanceResponse, error) { |
+ endpoint, err := instanceEndpoint(pin) |
if err != nil { |
return nil, err |
} |
@@ -205,23 +200,21 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis |
} |
switch reply.Status { |
case "REGISTERED", "ALREADY_REGISTERED": |
- info, err := makePackageInstanceInfo(reply.Instance) |
+ ts, err := convertTimestamp(reply.Instance.RegisteredTs) |
if err != nil { |
return nil, err |
} |
return ®isterInstanceResponse{ |
- AlreadyRegistered: reply.Status == "ALREADY_REGISTERED", |
- Info: info, |
+ alreadyRegistered: reply.Status == "ALREADY_REGISTERED", |
+ registeredBy: reply.Instance.RegisteredBy, |
+ registeredTs: ts, |
}, nil |
case "UPLOAD_FIRST": |
if reply.UploadSessionID == "" { |
- return nil, errors.New("Server didn't provide upload session ID") |
+ return nil, ErrNoUploadSessionID |
} |
return ®isterInstanceResponse{ |
- UploadSession: &uploadSession{ |
- ID: reply.UploadSessionID, |
- URL: reply.UploadURL, |
- }, |
+ uploadSession: &UploadSession{reply.UploadSessionID, reply.UploadURL}, |
}, nil |
case "ERROR": |
return nil, errors.New(reply.ErrorMessage) |
@@ -229,8 +222,8 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis |
return nil, fmt.Errorf("Unexpected register package status: %s", reply.Status) |
} |
-func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchInstanceResponse, error) { |
- endpoint, err := instanceEndpoint(packageName, instanceID) |
+func (r *remoteImpl) fetchInstance(pin common.Pin) (*fetchInstanceResponse, error) { |
+ endpoint, err := instanceEndpoint(pin) |
if err != nil { |
return nil, err |
} |
@@ -246,25 +239,26 @@ func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchIns |
} |
switch reply.Status { |
case "SUCCESS": |
- info, err := makePackageInstanceInfo(reply.Instance) |
+ ts, err := convertTimestamp(reply.Instance.RegisteredTs) |
if err != nil { |
return nil, err |
} |
return &fetchInstanceResponse{ |
- FetchURL: reply.FetchURL, |
- Info: info, |
+ fetchURL: reply.FetchURL, |
+ registeredBy: reply.Instance.RegisteredBy, |
+ registeredTs: ts, |
}, nil |
case "PACKAGE_NOT_FOUND": |
- return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", packageName) |
+ return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", pin.PackageName) |
case "INSTANCE_NOT_FOUND": |
- return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", packageName, instanceID) |
+ return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", pin.PackageName, pin.InstanceID) |
case "ERROR": |
return nil, errors.New(reply.ErrorMessage) |
} |
return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status) |
} |
-func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) { |
+func (r *remoteImpl) fetchACL(packagePath string) ([]PackageACL, error) { |
endpoint, err := aclEndpoint(packagePath) |
if err != nil { |
return nil, err |
@@ -309,7 +303,7 @@ func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) { |
return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status) |
} |
-func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange) error { |
+func (r *remoteImpl) modifyACL(packagePath string, changes []PackageACLChange) error { |
endpoint, err := aclEndpoint(packagePath) |
if err != nil { |
return err |
@@ -349,22 +343,19 @@ func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange |
return fmt.Errorf("Unexpected reply status: %s", reply.Status) |
} |
-func (r *remoteService) attachTags(packageName, instanceID string, tags []string) error { |
+func (r *remoteImpl) attachTags(pin common.Pin, tags []string) error { |
// Tags will be passed in the request body, not via URL. |
- endpoint, err := tagsEndpoint(packageName, instanceID, nil) |
+ endpoint, err := tagsEndpoint(pin, nil) |
if err != nil { |
return err |
} |
- |
- if len(tags) == 0 { |
- return errors.New("At least one tag must be provided") |
- } |
for _, tag := range tags { |
- err = ValidateInstanceTag(tag) |
+ err = common.ValidateInstanceTag(tag) |
if err != nil { |
return err |
} |
} |
+ |
var request struct { |
Tags []string `json:"tags"` |
} |
@@ -391,23 +382,19 @@ func (r *remoteService) attachTags(packageName, instanceID string, tags []string |
//////////////////////////////////////////////////////////////////////////////// |
-func instanceEndpoint(packageName, instanceID string) (string, error) { |
- err := ValidatePackageName(packageName) |
- if err != nil { |
- return "", err |
- } |
- err = ValidateInstanceID(instanceID) |
+func instanceEndpoint(pin common.Pin) (string, error) { |
+ err := common.ValidatePin(pin) |
if err != nil { |
return "", err |
} |
params := url.Values{} |
- params.Add("package_name", packageName) |
- params.Add("instance_id", instanceID) |
+ params.Add("package_name", pin.PackageName) |
+ params.Add("instance_id", pin.InstanceID) |
return "repo/v1/instance?" + params.Encode(), nil |
} |
func aclEndpoint(packagePath string) (string, error) { |
- err := ValidatePackageName(packagePath) |
+ err := common.ValidatePackageName(packagePath) |
if err != nil { |
return "", err |
} |
@@ -416,24 +403,20 @@ func aclEndpoint(packagePath string) (string, error) { |
return "repo/v1/acl?" + params.Encode(), nil |
} |
-func tagsEndpoint(packageName, instanceID string, tags []string) (string, error) { |
- err := ValidatePackageName(packageName) |
- if err != nil { |
- return "", err |
- } |
- err = ValidateInstanceID(instanceID) |
+func tagsEndpoint(pin common.Pin, tags []string) (string, error) { |
+ err := common.ValidatePin(pin) |
if err != nil { |
return "", err |
} |
for _, tag := range tags { |
- err = ValidateInstanceTag(tag) |
+ err = common.ValidateInstanceTag(tag) |
if err != nil { |
return "", err |
} |
} |
params := url.Values{} |
- params.Add("package_name", packageName) |
- params.Add("instance_id", instanceID) |
+ params.Add("package_name", pin.PackageName) |
+ params.Add("instance_id", pin.InstanceID) |
for _, tag := range tags { |
params.Add("tag", tag) |
} |
@@ -449,17 +432,3 @@ func convertTimestamp(ts string) (time.Time, error) { |
} |
return time.Unix(0, i*1000), nil |
} |
- |
-func makePackageInstanceInfo(msg packageInstanceMsg) (pi packageInstanceInfo, err error) { |
- ts, err := convertTimestamp(msg.RegisteredTs) |
- if err != nil { |
- return |
- } |
- pi = packageInstanceInfo{ |
- PackageName: msg.PackageName, |
- InstanceID: msg.InstanceID, |
- RegisteredBy: msg.RegisteredBy, |
- RegisteredTs: ts, |
- } |
- return |
-} |