| Index: go/src/infra/tools/cipd/remote.go
|
| diff --git a/go/src/infra/tools/cipd/remote.go b/go/src/infra/tools/cipd/remote.go
|
| index 627fbba0def1f45319ce5fa4c6f640642e99378a..ef46e2a1efb08b6e03d3bbe27eb07886800e7d4d 100644
|
| --- a/go/src/infra/tools/cipd/remote.go
|
| +++ b/go/src/infra/tools/cipd/remote.go
|
| @@ -16,23 +16,12 @@ import (
|
| "strconv"
|
| "time"
|
|
|
| - "infra/libs/logging"
|
| + "infra/tools/cipd/common"
|
| )
|
|
|
| -// remoteService is a wrapper around Cloud Endpoints APIs exposed by backend.
|
| -// See //appengine/chrome_infra_packages.
|
| -type remoteService struct {
|
| - client *http.Client
|
| - serviceURL string
|
| - log logging.Logger
|
| -}
|
| -
|
| -type uploadSession struct {
|
| - ID string
|
| - URL string
|
| -}
|
| +// remoteMaxRetries is how many times to retry transient HTTP errors.
|
| +const remoteMaxRetries = 10
|
|
|
| -// packageInstanceMsg corresponds to PackageInstance message on the backend.
|
| type packageInstanceMsg struct {
|
| PackageName string `json:"package_name"`
|
| InstanceID string `json:"instance_id"`
|
| @@ -40,26 +29,6 @@ type packageInstanceMsg struct {
|
| RegisteredTs string `json:"registered_ts"`
|
| }
|
|
|
| -// packageInstance is almost like packageInstanceMsg, but with timestamp as
|
| -// time.Time. See also makePackageInstanceInfo.
|
| -type packageInstanceInfo struct {
|
| - PackageName string
|
| - InstanceID string
|
| - RegisteredBy string
|
| - RegisteredTs time.Time
|
| -}
|
| -
|
| -type registerInstanceResponse struct {
|
| - UploadSession *uploadSession
|
| - AlreadyRegistered bool
|
| - Info packageInstanceInfo
|
| -}
|
| -
|
| -type fetchInstanceResponse struct {
|
| - FetchURL string
|
| - Info packageInstanceInfo
|
| -}
|
| -
|
| // roleChangeMsg corresponds to RoleChange proto message on backend.
|
| type roleChangeMsg struct {
|
| Action string `json:"action"`
|
| @@ -77,32 +46,36 @@ func (e *pendingProcessingError) Error() string {
|
| return e.message
|
| }
|
|
|
| -// newRemoteService is mocked in tests.
|
| -var newRemoteService = func(client *http.Client, url string, log logging.Logger) *remoteService {
|
| - log.Infof("cipd: service URL is %s", url)
|
| - return &remoteService{
|
| - client: client,
|
| - serviceURL: url,
|
| - log: log,
|
| - }
|
| +// remoteImpl implements remote on top of real HTTP calls.
|
| +type remoteImpl struct {
|
| + client *Client
|
| +}
|
| +
|
| +func isTemporaryNetError(err error) bool {
|
| + // TODO(vadimsh): Figure out how to recognize dial timeouts, read timeouts,
|
| + // etc. For now all errors that end up here are considered retryable.
|
| + return true
|
| }
|
|
|
| -// makeRequest sends POST or GET request with retries.
|
| -func (r *remoteService) makeRequest(path, method string, request, response interface{}) error {
|
| +// makeRequest sends POST or GET REST JSON requests with retries.
|
| +func (r *remoteImpl) makeRequest(path, method string, request, response interface{}) error {
|
| var body []byte
|
| - var err error
|
| if request != nil {
|
| - body, err = json.Marshal(request)
|
| + b, err := json.Marshal(request)
|
| if err != nil {
|
| return err
|
| }
|
| + body = b
|
| }
|
| - url := fmt.Sprintf("%s/_ah/api/%s", r.serviceURL, path)
|
| - for attempt := 0; attempt < 10; attempt++ {
|
| +
|
| + url := fmt.Sprintf("%s/_ah/api/%s", r.client.ServiceURL, path)
|
| + for attempt := 0; attempt < remoteMaxRetries; attempt++ {
|
| if attempt != 0 {
|
| - r.log.Warningf("cipd: retrying request to %s", url)
|
| - clock.Sleep(2 * time.Second)
|
| + r.client.Log.Warningf("cipd: retrying request to %s", url)
|
| + r.client.clock.sleep(2 * time.Second)
|
| }
|
| +
|
| + // Prepare request.
|
| var bodyReader io.Reader
|
| if body != nil {
|
| bodyReader = bytes.NewReader(body)
|
| @@ -114,29 +87,47 @@ func (r *remoteService) makeRequest(path, method string, request, response inter
|
| if body != nil {
|
| req.Header.Set("Content-Type", "application/json")
|
| }
|
| - req.Header.Set("User-Agent", userAgent())
|
| - resp, err := r.client.Do(req)
|
| + req.Header.Set("User-Agent", r.client.UserAgent)
|
| +
|
| + // Connect, read response.
|
| + r.client.Log.Debugf("cipd: %s %s", method, url)
|
| + resp, err := r.client.doAuthenticatedHTTPRequest(req)
|
| if err != nil {
|
| + if isTemporaryNetError(err) {
|
| + r.client.Log.Warningf("cipd: connectivity error (%s)", err)
|
| + continue
|
| + }
|
| return err
|
| }
|
| + responseBody, err := ioutil.ReadAll(resp.Body)
|
| + resp.Body.Close()
|
| + if err != nil {
|
| + if isTemporaryNetError(err) {
|
| + r.client.Log.Warningf("cipd: temporary error when reading response (%s)", err)
|
| + continue
|
| + }
|
| + return err
|
| + }
|
| + r.client.Log.Debugf("cipd: http %d: %s", resp.StatusCode, body)
|
| +
|
| // Success?
|
| if resp.StatusCode < 300 {
|
| - defer resp.Body.Close()
|
| - return json.NewDecoder(resp.Body).Decode(response)
|
| + return json.Unmarshal(responseBody, response)
|
| }
|
| +
|
| // Fatal error?
|
| - if resp.StatusCode >= 300 && resp.StatusCode < 500 {
|
| - defer resp.Body.Close()
|
| - body, _ := ioutil.ReadAll(resp.Body)
|
| + if resp.StatusCode >= 300 && resp.StatusCode < 500 && resp.StatusCode != 408 {
|
| + if resp.StatusCode == 403 || resp.StatusCode == 401 {
|
| + return ErrAccessDenined
|
| + }
|
| return fmt.Errorf("Unexpected reply (HTTP %d):\n%s", resp.StatusCode, string(body))
|
| }
|
| - // Retry.
|
| - resp.Body.Close()
|
| }
|
| - return fmt.Errorf("Request to %s failed after 10 attempts", url)
|
| +
|
| + return ErrBackendInaccessible
|
| }
|
|
|
| -func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error) {
|
| +func (r *remoteImpl) initiateUpload(sha1 string) (s *UploadSession, err error) {
|
| var reply struct {
|
| Status string `json:"status"`
|
| UploadSessionID string `json:"upload_session_id"`
|
| @@ -151,10 +142,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error
|
| case "ALREADY_UPLOADED":
|
| return
|
| case "SUCCESS":
|
| - s = &uploadSession{
|
| - ID: reply.UploadSessionID,
|
| - URL: reply.UploadURL,
|
| - }
|
| + s = &UploadSession{reply.UploadSessionID, reply.UploadURL}
|
| case "ERROR":
|
| err = fmt.Errorf("Server replied with error: %s", reply.ErrorMessage)
|
| default:
|
| @@ -163,7 +151,7 @@ func (r *remoteService) initiateUpload(sha1 string) (s *uploadSession, err error
|
| return
|
| }
|
|
|
| -func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err error) {
|
| +func (r *remoteImpl) finalizeUpload(sessionID string) (finished bool, err error) {
|
| var reply struct {
|
| Status string `json:"status"`
|
| ErrorMessage string `json:"error_message"`
|
| @@ -174,7 +162,7 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err
|
| }
|
| switch reply.Status {
|
| case "MISSING":
|
| - err = errors.New("Upload session is unexpectedly missing")
|
| + err = ErrUploadSessionDied
|
| case "UPLOADING", "VERIFYING":
|
| finished = false
|
| case "PUBLISHED":
|
| @@ -187,8 +175,8 @@ func (r *remoteService) finalizeUpload(sessionID string) (finished bool, err err
|
| return
|
| }
|
|
|
| -func (r *remoteService) registerInstance(packageName, instanceID string) (*registerInstanceResponse, error) {
|
| - endpoint, err := instanceEndpoint(packageName, instanceID)
|
| +func (r *remoteImpl) registerInstance(pin common.Pin) (*registerInstanceResponse, error) {
|
| + endpoint, err := instanceEndpoint(pin)
|
| if err != nil {
|
| return nil, err
|
| }
|
| @@ -205,23 +193,21 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis
|
| }
|
| switch reply.Status {
|
| case "REGISTERED", "ALREADY_REGISTERED":
|
| - info, err := makePackageInstanceInfo(reply.Instance)
|
| + ts, err := convertTimestamp(reply.Instance.RegisteredTs)
|
| if err != nil {
|
| return nil, err
|
| }
|
| return ®isterInstanceResponse{
|
| - AlreadyRegistered: reply.Status == "ALREADY_REGISTERED",
|
| - Info: info,
|
| + alreadyRegistered: reply.Status == "ALREADY_REGISTERED",
|
| + registeredBy: reply.Instance.RegisteredBy,
|
| + registeredTs: ts,
|
| }, nil
|
| case "UPLOAD_FIRST":
|
| if reply.UploadSessionID == "" {
|
| - return nil, errors.New("Server didn't provide upload session ID")
|
| + return nil, ErrNoUploadSessionID
|
| }
|
| return ®isterInstanceResponse{
|
| - UploadSession: &uploadSession{
|
| - ID: reply.UploadSessionID,
|
| - URL: reply.UploadURL,
|
| - },
|
| + uploadSession: &UploadSession{reply.UploadSessionID, reply.UploadURL},
|
| }, nil
|
| case "ERROR":
|
| return nil, errors.New(reply.ErrorMessage)
|
| @@ -229,8 +215,8 @@ func (r *remoteService) registerInstance(packageName, instanceID string) (*regis
|
| return nil, fmt.Errorf("Unexpected register package status: %s", reply.Status)
|
| }
|
|
|
| -func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchInstanceResponse, error) {
|
| - endpoint, err := instanceEndpoint(packageName, instanceID)
|
| +func (r *remoteImpl) fetchInstance(pin common.Pin) (*fetchInstanceResponse, error) {
|
| + endpoint, err := instanceEndpoint(pin)
|
| if err != nil {
|
| return nil, err
|
| }
|
| @@ -246,25 +232,26 @@ func (r *remoteService) fetchInstance(packageName, instanceID string) (*fetchIns
|
| }
|
| switch reply.Status {
|
| case "SUCCESS":
|
| - info, err := makePackageInstanceInfo(reply.Instance)
|
| + ts, err := convertTimestamp(reply.Instance.RegisteredTs)
|
| if err != nil {
|
| return nil, err
|
| }
|
| return &fetchInstanceResponse{
|
| - FetchURL: reply.FetchURL,
|
| - Info: info,
|
| + fetchURL: reply.FetchURL,
|
| + registeredBy: reply.Instance.RegisteredBy,
|
| + registeredTs: ts,
|
| }, nil
|
| case "PACKAGE_NOT_FOUND":
|
| - return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", packageName)
|
| + return nil, fmt.Errorf("Package '%s' is not registered or you do not have permission to fetch it", pin.PackageName)
|
| case "INSTANCE_NOT_FOUND":
|
| - return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", packageName, instanceID)
|
| + return nil, fmt.Errorf("Package '%s' doesn't have instance '%s'", pin.PackageName, pin.InstanceID)
|
| case "ERROR":
|
| return nil, errors.New(reply.ErrorMessage)
|
| }
|
| return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status)
|
| }
|
|
|
| -func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) {
|
| +func (r *remoteImpl) fetchACL(packagePath string) ([]PackageACL, error) {
|
| endpoint, err := aclEndpoint(packagePath)
|
| if err != nil {
|
| return nil, err
|
| @@ -309,7 +296,7 @@ func (r *remoteService) fetchACL(packagePath string) ([]PackageACL, error) {
|
| return nil, fmt.Errorf("Unexpected reply status: %s", reply.Status)
|
| }
|
|
|
| -func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange) error {
|
| +func (r *remoteImpl) modifyACL(packagePath string, changes []PackageACLChange) error {
|
| endpoint, err := aclEndpoint(packagePath)
|
| if err != nil {
|
| return err
|
| @@ -349,22 +336,19 @@ func (r *remoteService) modifyACL(packagePath string, changes []PackageACLChange
|
| return fmt.Errorf("Unexpected reply status: %s", reply.Status)
|
| }
|
|
|
| -func (r *remoteService) attachTags(packageName, instanceID string, tags []string) error {
|
| +func (r *remoteImpl) attachTags(pin common.Pin, tags []string) error {
|
| // Tags will be passed in the request body, not via URL.
|
| - endpoint, err := tagsEndpoint(packageName, instanceID, nil)
|
| + endpoint, err := tagsEndpoint(pin, nil)
|
| if err != nil {
|
| return err
|
| }
|
| -
|
| - if len(tags) == 0 {
|
| - return errors.New("At least one tag must be provided")
|
| - }
|
| for _, tag := range tags {
|
| - err = ValidateInstanceTag(tag)
|
| + err = common.ValidateInstanceTag(tag)
|
| if err != nil {
|
| return err
|
| }
|
| }
|
| +
|
| var request struct {
|
| Tags []string `json:"tags"`
|
| }
|
| @@ -391,23 +375,19 @@ func (r *remoteService) attachTags(packageName, instanceID string, tags []string
|
|
|
| ////////////////////////////////////////////////////////////////////////////////
|
|
|
| -func instanceEndpoint(packageName, instanceID string) (string, error) {
|
| - err := ValidatePackageName(packageName)
|
| - if err != nil {
|
| - return "", err
|
| - }
|
| - err = ValidateInstanceID(instanceID)
|
| +func instanceEndpoint(pin common.Pin) (string, error) {
|
| + err := common.ValidatePin(pin)
|
| if err != nil {
|
| return "", err
|
| }
|
| params := url.Values{}
|
| - params.Add("package_name", packageName)
|
| - params.Add("instance_id", instanceID)
|
| + params.Add("package_name", pin.PackageName)
|
| + params.Add("instance_id", pin.InstanceID)
|
| return "repo/v1/instance?" + params.Encode(), nil
|
| }
|
|
|
| func aclEndpoint(packagePath string) (string, error) {
|
| - err := ValidatePackageName(packagePath)
|
| + err := common.ValidatePackageName(packagePath)
|
| if err != nil {
|
| return "", err
|
| }
|
| @@ -416,24 +396,20 @@ func aclEndpoint(packagePath string) (string, error) {
|
| return "repo/v1/acl?" + params.Encode(), nil
|
| }
|
|
|
| -func tagsEndpoint(packageName, instanceID string, tags []string) (string, error) {
|
| - err := ValidatePackageName(packageName)
|
| - if err != nil {
|
| - return "", err
|
| - }
|
| - err = ValidateInstanceID(instanceID)
|
| +func tagsEndpoint(pin common.Pin, tags []string) (string, error) {
|
| + err := common.ValidatePin(pin)
|
| if err != nil {
|
| return "", err
|
| }
|
| for _, tag := range tags {
|
| - err = ValidateInstanceTag(tag)
|
| + err = common.ValidateInstanceTag(tag)
|
| if err != nil {
|
| return "", err
|
| }
|
| }
|
| params := url.Values{}
|
| - params.Add("package_name", packageName)
|
| - params.Add("instance_id", instanceID)
|
| + params.Add("package_name", pin.PackageName)
|
| + params.Add("instance_id", pin.InstanceID)
|
| for _, tag := range tags {
|
| params.Add("tag", tag)
|
| }
|
| @@ -449,17 +425,3 @@ func convertTimestamp(ts string) (time.Time, error) {
|
| }
|
| return time.Unix(0, i*1000), nil
|
| }
|
| -
|
| -func makePackageInstanceInfo(msg packageInstanceMsg) (pi packageInstanceInfo, err error) {
|
| - ts, err := convertTimestamp(msg.RegisteredTs)
|
| - if err != nil {
|
| - return
|
| - }
|
| - pi = packageInstanceInfo{
|
| - PackageName: msg.PackageName,
|
| - InstanceID: msg.InstanceID,
|
| - RegisteredBy: msg.RegisteredBy,
|
| - RegisteredTs: ts,
|
| - }
|
| - return
|
| -}
|
|
|