Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(704)

Unified Diff: go/src/infra/tools/cipd/remote.go

Issue 1129043003: cipd: Refactor client to make it more readable. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « go/src/infra/tools/cipd/reader_test.go ('k') | go/src/infra/tools/cipd/remote_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: go/src/infra/tools/cipd/remote.go
diff --git a/go/src/infra/tools/cipd/remote.go b/go/src/infra/tools/cipd/remote.go
index 627fbba0def1f45319ce5fa4c6f640642e99378a..7bb25f8e8f62f5a883967f4344721fe613e1f786 100644
--- a/go/src/infra/tools/cipd/remote.go
+++ b/go/src/infra/tools/cipd/remote.go
@@ -16,23 +16,12 @@ import (
"strconv"
"time"
- "infra/libs/logging"
+ "infra/tools/cipd/common"
)
-// remoteService is a wrapper around Cloud Endpoints APIs exposed by backend.
-// See //appengine/chrome_infra_packages.
-type remoteService struct {
- client *http.Client
- serviceURL string
- log logging.Logger
-}
-
-type uploadSession struct {
- ID string
- URL string
-}
+// remoteMaxRetries is how many times to retry transient HTTP errors.
+const remoteMaxRetries = 10
-// packageInstanceMsg corresponds to PackageInstance message on the backend.
type packageInstanceMsg struct {
PackageName string `json:"package_name"`
InstanceID string `json:"instance_id"`
@@ -40,26 +29,6 @@ type packageInstanceMsg struct {
RegisteredTs string `json:"registered_ts"`
}
-// packageInstance is almost like packageInstanceMsg, but with timestamp as
-// time.Time. See also makePackageInstanceInfo.
-type packageInstanceInfo struct {
- PackageName string
- InstanceID string
- RegisteredBy string
- RegisteredTs time.Time
-}
-
-type registerInstanceResponse struct {
- UploadSession *uploadSession
- AlreadyRegistered bool
- Info packageInstanceInfo
-}
-
-type fetchInstanceResponse struct {
- FetchURL string
- Info packageInstanceInfo
-}
-
// roleChangeMsg corresponds to RoleChange proto message on backend.
type roleChangeMsg struct {
Action string `json:"action"`
@@ -77,32 +46,42 @@ func (e *pendingProcessingError) Error() string {
return e.message
}
-// newRemoteService is mocked in tests.
-var newRemoteService = func(client *http.Client, url string, log logging.Logger) *remoteService {
- log.Infof("cipd: service URL is %s", url)
- return &remoteService{
- client: client,
- serviceURL: url,
- log: log,
- }
+// remoteImpl implements remote on top of real HTTP calls.
+type remoteImpl struct {
+ client *Client
+}
+
+func isTemporaryNetError(err error) bool {
+ // TODO(vadimsh): Figure out how to recognize dial timeouts, read timeouts,
+ // etc. For now all errors that end up here are considered temporary.
+ return true
}
-// makeRequest sends POST or GET request with retries.
-func (r *remoteService) makeRequest(path, method string, request, response interface{}) error {
+// isTemporaryHTTPError returns true for HTTP status codes that indicate
+// a temporary error that may go away if request is retried.
+func isTemporaryHTTPError(statusCode int) bool {
+ return statusCode >= 500 || statusCode == 408 || statusCode == 429
+}
+
+// makeRequest sends POST or GET REST JSON requests with retries.
+func (r *remoteImpl) makeRequest(path, method string, request, response interface{}) error {
var body []byte
- var err error
if request != nil {
- body, err = json.Marshal(request)
+ b, err := json.Marshal(request)
if err != nil {
return err
}
+ body = b
}
- url := fmt.Sprintf("%s/_ah/api/%s", r.serviceURL, path)
- for attempt := 0; attempt < 10; attempt++ {
+
+ url := fmt.Sprintf("%s/_ah/api/%s", r.client.ServiceURL, path)
+ for attempt := 0; attempt < remoteMaxRetries; attempt++ {
if attempt != 0 {
- r.log.Warningf("cipd: retrying request to %s", url)
- clock.Sleep(2 * time.Second)
+ r.client.Log.Warningf("cipd: retrying request to %s", url)
+ r.client.clock.sleep(2 * time.Second)
}
+
+ // Prepare request.
var bodyReader io.Reader
if body != nil {
bodyReader = bytes.NewReader(body)
@@ -114,29 +93,48 @@ func (r *remoteService) makeRequest(path, method string, request, response inter
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
- req.Header.Set("User-Agent", userAgent())
- resp, err := r.client.Do(req)
+ req.Header.Set("User-Agent", r.client.UserAgent)
+
+ // Connect, read response.
+ r.client.Log.Debugf("cipd: %s %s", method, url)
+ resp, err := r.client.doAuthenticatedHTTPRequest(req)
if err != nil {
+ if isTemporaryNetError(err) {
+ r.client.Log.Warningf("cipd: connectivity error (%s)", err)
+ continue
+ }
+ return err
+ }
+ responseBody, err := ioutil.ReadAll(resp.Body)
+ resp.Body.Close()
+ if err != nil {
+ if isTemporaryNetError(err) {
+ r.client.Log.Warningf("cipd: temporary error when reading response (%s)", err)
+ continue
+ }
return err
}
+ r.client.Log.Debugf("cipd: http %d: %s", resp.StatusCode, body)
+ if isTemporaryHTTPError(resp.StatusCode) {
+ continue
+ }
+
// Success?
if resp.StatusCode < 300 {
- defer resp.Body.Close()
- return json.NewDecoder(resp.Body).Decode(response)
+ return json.Unmarshal(responseBody, response)
}
+
// Fatal error?
- if resp.StatusCode >= 300 && resp.StatusCode < 500 {
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- return fmt.Errorf("Unexpected reply (HTTP %d):\n%s", resp.StatusCode, string(body))
+ if resp.StatusCode == 403 || resp.StatusCode == 401 {
+ return ErrAccessDenined
}
- // Retry.
- resp.Body.Close()
+ return fmt.Errorf("Unexpected reply (HTTP %d):\n%s", resp.StatusCode, string(body))
}
- return fmt.Errorf("Request to %s failed after 10 attempts", url)
+
+ return ErrBackendInaccessible
}
-func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error) {
+func (r *remoteImpl) initiateUpload(sha1 string) (s *UploadSession, err error) {
var reply struct {
Status string `json:"status"`
UploadSessionID string `json:"upload_session_id"`
@@ -151,10 +149,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error
case "ALREADY_UPLOADED":
return
case "SUCCESS":
- s = &uploadSession{
- ID: reply.UploadSessionID,
- URL: reply.UploadURL,
- }
+ s = &UploadSession{reply.UploadSessionID, reply.UploadURL}
case "ERROR":
err = fmt.Errorf("Server replied with error: %s", reply.ErrorMessage)
default:
@@ -163,7 +158,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error
return
}
-func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err error) {
+func (r *remoteImpl) finalizeUpload(sessionID string) (finished bool, err error) {
var reply struct {
Status string `json:"status"`
ErrorMessage string `json:"error_message"`
@@ -174,7 +169,7 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err
}
switch reply.Status {
case "MISSING":
- err = errors.New("Upload session is unexpectedly missing")
+ err = ErrUploadSessionDied
case "UPLOADING", "VERIFYING":
finished = false
case "PUBLISHED":
@@ -187,8 +182,8 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err
return
}
-func (r *remoteService) registerInstance(packageName, instanceID string) (*registerInstanceResponse, error) {
- endpoint, err := instanceEndpoint(packageName, instanceID)
+func (r *remoteImpl) registerInstance(pin common.Pin) (*registerInstanceResponse, error) {
+ endpoint, err := instanceEndpoint(pin)
if err != nil {
return nil, err
}
@@ -205,23 +200,21 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis
}
switch reply.Status {
case "REGISTERED", "ALREADY_REGISTERED":
- info, err := makePackageInstanceInfo(reply.Instance)
+ ts, err := convertTimestamp(reply.Instance.RegisteredTs)
if err != nil {
return nil, err
}
return &registerInstanceResponse{
- AlreadyRegistered: reply.Status == "ALREADY_REGISTERED",
- Info: info,
+ alreadyRegistered: reply.Status == "ALREADY_REGISTERED",
+ registeredBy: reply.Instance.RegisteredBy,
+ registeredTs: ts,
}, nil
case "UPLOAD_FIRST":
if reply.UploadSessionID == "" {
- return nil, errors.New("Server didn't provide upload session ID")
+ return nil, ErrNoUploadSessionID
}
return &registerInstanceResponse{
- UploadSession: &uploadSession{
- ID: reply.UploadSessionID,
- URL: reply.UploadURL,
- },
+ uploadSession: &UploadSession{reply.UploadSessionID, reply.UploadURL},
}, nil
case "ERROR":
return nil, errors.New(reply.ErrorMessage)
@@ -229,8 +222,8 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis
return nil, fmt.Errorf("Unexpected register package status: %s", reply.Status)
}
-func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchInstanceResponse, error) {
- endpoint, err := instanceEndpoint(packageName, instanceID)
+func (r *remoteImpl) fetchInstance(pin common.Pin) (*fetchInstanceResponse, error) {
+ endpoint, err := instanceEndpoint(pin)
if err != nil {
return nil, err
}
@@ -246,25 +239,26 @@ func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchIns
}
switch reply.Status {
case "SUCCESS":
- info, err := makePackageInstanceInfo(reply.Instance)
+ ts, err := convertTimestamp(reply.Instance.RegisteredTs)
if err != nil {
return nil, err
}
return &fetchInstanceResponse{
- FetchURL: reply.FetchURL,
- Info: info,
+ fetchURL: reply.FetchURL,
+ registeredBy: reply.Instance.RegisteredBy,
+ registeredTs: ts,
}, nil
case "PACKAGE_NOT_FOUND":
- return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", packageName)
+ return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", pin.PackageName)
case "INSTANCE_NOT_FOUND":
- return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", packageName, instanceID)
+ return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", pin.PackageName, pin.InstanceID)
case "ERROR":
return nil, errors.New(reply.ErrorMessage)
}
return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status)
}
-func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) {
+func (r *remoteImpl) fetchACL(packagePath string) ([]PackageACL, error) {
endpoint, err := aclEndpoint(packagePath)
if err != nil {
return nil, err
@@ -309,7 +303,7 @@ func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) {
return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status)
}
-func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange) error {
+func (r *remoteImpl) modifyACL(packagePath string, changes []PackageACLChange) error {
endpoint, err := aclEndpoint(packagePath)
if err != nil {
return err
@@ -349,22 +343,19 @@ func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange
return fmt.Errorf("Unexpected reply status: %s", reply.Status)
}
-func (r *remoteService) attachTags(packageName, instanceID string, tags []string) error {
+func (r *remoteImpl) attachTags(pin common.Pin, tags []string) error {
// Tags will be passed in the request body, not via URL.
- endpoint, err := tagsEndpoint(packageName, instanceID, nil)
+ endpoint, err := tagsEndpoint(pin, nil)
if err != nil {
return err
}
-
- if len(tags) == 0 {
- return errors.New("At least one tag must be provided")
- }
for _, tag := range tags {
- err = ValidateInstanceTag(tag)
+ err = common.ValidateInstanceTag(tag)
if err != nil {
return err
}
}
+
var request struct {
Tags []string `json:"tags"`
}
@@ -391,23 +382,19 @@ func (r *remoteService) attachTags(packageName, instanceID string, tags []string
////////////////////////////////////////////////////////////////////////////////
-func instanceEndpoint(packageName, instanceID string) (string, error) {
- err := ValidatePackageName(packageName)
- if err != nil {
- return "", err
- }
- err = ValidateInstanceID(instanceID)
+func instanceEndpoint(pin common.Pin) (string, error) {
+ err := common.ValidatePin(pin)
if err != nil {
return "", err
}
params := url.Values{}
- params.Add("package_name", packageName)
- params.Add("instance_id", instanceID)
+ params.Add("package_name", pin.PackageName)
+ params.Add("instance_id", pin.InstanceID)
return "repo/v1/instance?" + params.Encode(), nil
}
func aclEndpoint(packagePath string) (string, error) {
- err := ValidatePackageName(packagePath)
+ err := common.ValidatePackageName(packagePath)
if err != nil {
return "", err
}
@@ -416,24 +403,20 @@ func aclEndpoint(packagePath string) (string, error) {
return "repo/v1/acl?" + params.Encode(), nil
}
-func tagsEndpoint(packageName, instanceID string, tags []string) (string, error) {
- err := ValidatePackageName(packageName)
- if err != nil {
- return "", err
- }
- err = ValidateInstanceID(instanceID)
+func tagsEndpoint(pin common.Pin, tags []string) (string, error) {
+ err := common.ValidatePin(pin)
if err != nil {
return "", err
}
for _, tag := range tags {
- err = ValidateInstanceTag(tag)
+ err = common.ValidateInstanceTag(tag)
if err != nil {
return "", err
}
}
params := url.Values{}
- params.Add("package_name", packageName)
- params.Add("instance_id", instanceID)
+ params.Add("package_name", pin.PackageName)
+ params.Add("instance_id", pin.InstanceID)
for _, tag := range tags {
params.Add("tag", tag)
}
@@ -449,17 +432,3 @@ func convertTimestamp(ts string) (time.Time, error) {
}
return time.Unix(0, i*1000), nil
}
-
-func makePackageInstanceInfo(msg packageInstanceMsg) (pi packageInstanceInfo, err error) {
- ts, err := convertTimestamp(msg.RegisteredTs)
- if err != nil {
- return
- }
- pi = packageInstanceInfo{
- PackageName: msg.PackageName,
- InstanceID: msg.InstanceID,
- RegisteredBy: msg.RegisteredBy,
- RegisteredTs: ts,
- }
- return
-}
« no previous file with comments | « go/src/infra/tools/cipd/reader_test.go ('k') | go/src/infra/tools/cipd/remote_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698