| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 // Package buildbucket implements tasks that run Buildbucket jobs. | 15 // Package buildbucket implements tasks that run Buildbucket jobs. |
| 16 package buildbucket | 16 package buildbucket |
| 17 | 17 |
| 18 import ( | 18 import ( |
| 19 "encoding/json" | 19 "encoding/json" |
| 20 "fmt" | 20 "fmt" |
| 21 "net/url" | 21 "net/url" |
| 22 "strings" |
| 22 "time" | 23 "time" |
| 23 | 24 |
| 24 "github.com/golang/protobuf/proto" | 25 "github.com/golang/protobuf/proto" |
| 25 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 26 "google.golang.org/api/pubsub/v1" | 27 "google.golang.org/api/pubsub/v1" |
| 27 | 28 |
| 28 "github.com/luci/gae/service/info" | 29 "github.com/luci/gae/service/info" |
| 29 "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1" | 30 "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1" |
| 30 "github.com/luci/luci-go/common/errors" | 31 "github.com/luci/luci-go/common/errors" |
| 31 "github.com/luci/luci-go/common/retry/transient" | 32 "github.com/luci/luci-go/common/retry/transient" |
| (...skipping 22 matching lines...) Expand all Loading... |
| 54 return (*messages.BuildbucketTask)(nil) | 55 return (*messages.BuildbucketTask)(nil) |
| 55 } | 56 } |
| 56 | 57 |
| 57 // Traits is part of Manager interface. | 58 // Traits is part of Manager interface. |
| 58 func (m TaskManager) Traits() task.Traits { | 59 func (m TaskManager) Traits() task.Traits { |
| 59 return task.Traits{ | 60 return task.Traits{ |
| 60 Multistage: true, // we use task.StatusRunning state | 61 Multistage: true, // we use task.StatusRunning state |
| 61 } | 62 } |
| 62 } | 63 } |
| 63 | 64 |
| 65 func normalizeServerURL(s string) string { |
| 66 if strings.HasPrefix(s, "https://") || strings.HasPrefix(s, "http://") { |
| 67 return s |
| 68 } |
| 69 return "https://" + s |
| 70 } |
| 71 |
| 64 // ValidateProtoMessage is part of Manager interface. | 72 // ValidateProtoMessage is part of Manager interface. |
| 65 func (m TaskManager) ValidateProtoMessage(msg proto.Message) error { | 73 func (m TaskManager) ValidateProtoMessage(msg proto.Message) error { |
| 66 cfg, ok := msg.(*messages.BuildbucketTask) | 74 cfg, ok := msg.(*messages.BuildbucketTask) |
| 67 if !ok { | 75 if !ok { |
| 68 return fmt.Errorf("wrong type %T, expecting *messages.Buildbucke
tTask", msg) | 76 return fmt.Errorf("wrong type %T, expecting *messages.Buildbucke
tTask", msg) |
| 69 } | 77 } |
| 70 if cfg == nil { | 78 if cfg == nil { |
| 71 return fmt.Errorf("expecting a non-empty BuildbucketTask") | 79 return fmt.Errorf("expecting a non-empty BuildbucketTask") |
| 72 } | 80 } |
| 73 | 81 |
| 74 // Validate 'server' field. | 82 // Validate 'server' field. |
| 75 if cfg.Server == "" { | 83 if cfg.Server == "" { |
| 76 return fmt.Errorf("field 'server' is required") | 84 return fmt.Errorf("field 'server' is required") |
| 77 } | 85 } |
| 78 » u, err := url.Parse(cfg.Server) | 86 » u, err := url.Parse(normalizeServerURL(cfg.Server)) |
| 79 if err != nil { | 87 if err != nil { |
| 80 return fmt.Errorf("invalid URL %q: %s", cfg.Server, err) | 88 return fmt.Errorf("invalid URL %q: %s", cfg.Server, err) |
| 81 } | 89 } |
| 82 if !u.IsAbs() { | 90 if !u.IsAbs() { |
| 83 return fmt.Errorf("not an absolute url: %q", cfg.Server) | 91 return fmt.Errorf("not an absolute url: %q", cfg.Server) |
| 84 } | 92 } |
| 85 if u.Path != "" { | 93 if u.Path != "" { |
| 86 return fmt.Errorf("not a host root url: %q", cfg.Server) | 94 return fmt.Errorf("not a host root url: %q", cfg.Server) |
| 87 } | 95 } |
| 88 | 96 |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 157 for _, kv := range utils.UnpackKVList(cfg.Properties, ':') { | 165 for _, kv := range utils.UnpackKVList(cfg.Properties, ':') { |
| 158 params.Properties[kv.Key] = kv.Value | 166 params.Properties[kv.Key] = kv.Value |
| 159 } | 167 } |
| 160 paramsJSON, err := json.Marshal(¶ms) | 168 paramsJSON, err := json.Marshal(¶ms) |
| 161 if err != nil { | 169 if err != nil { |
| 162 return fmt.Errorf("failed to marshal parameters JSON - %s", err) | 170 return fmt.Errorf("failed to marshal parameters JSON - %s", err) |
| 163 } | 171 } |
| 164 | 172 |
| 165 // Make sure Buildbucket can publish PubSub messages, grab token that wo
uld | 173 // Make sure Buildbucket can publish PubSub messages, grab token that wo
uld |
| 166 // identify this invocation when receiving PubSub notifications. | 174 // identify this invocation when receiving PubSub notifications. |
| 167 » ctl.DebugLog("Preparing PubSub topic for %q", cfg.Server) | 175 » serverURL := normalizeServerURL(cfg.Server) |
| 168 » topic, authToken, err := ctl.PrepareTopic(c, cfg.Server) | 176 » ctl.DebugLog("Preparing PubSub topic for %q", serverURL) |
| 177 » topic, authToken, err := ctl.PrepareTopic(c, serverURL) |
| 169 if err != nil { | 178 if err != nil { |
| 170 ctl.DebugLog("Failed to prepare PubSub topic - %s", err) | 179 ctl.DebugLog("Failed to prepare PubSub topic - %s", err) |
| 171 return err | 180 return err |
| 172 } | 181 } |
| 173 ctl.DebugLog("PubSub topic is %q", topic) | 182 ctl.DebugLog("PubSub topic is %q", topic) |
| 174 | 183 |
| 175 // Prepare the request. | 184 // Prepare the request. |
| 176 request := buildbucket.ApiPutRequestMessage{ | 185 request := buildbucket.ApiPutRequestMessage{ |
| 177 Bucket: cfg.Bucket, | 186 Bucket: cfg.Bucket, |
| 178 ClientOperationId: fmt.Sprintf("%d", ctl.InvocationNonce()), | 187 ClientOperationId: fmt.Sprintf("%d", ctl.InvocationNonce()), |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 277 func (m TaskManager) createBuildbucketService(c context.Context, ctl task.Contro
ller) (*buildbucket.Service, error) { | 286 func (m TaskManager) createBuildbucketService(c context.Context, ctl task.Contro
ller) (*buildbucket.Service, error) { |
| 278 client, err := ctl.GetClient(c, time.Minute) | 287 client, err := ctl.GetClient(c, time.Minute) |
| 279 if err != nil { | 288 if err != nil { |
| 280 return nil, err | 289 return nil, err |
| 281 } | 290 } |
| 282 service, err := buildbucket.New(client) | 291 service, err := buildbucket.New(client) |
| 283 if err != nil { | 292 if err != nil { |
| 284 return nil, err | 293 return nil, err |
| 285 } | 294 } |
| 286 cfg := ctl.Task().(*messages.BuildbucketTask) | 295 cfg := ctl.Task().(*messages.BuildbucketTask) |
| 287 » service.BasePath = cfg.Server + "/_ah/api/buildbucket/v1/" | 296 » service.BasePath = normalizeServerURL(cfg.Server) + "/_ah/api/buildbucke
t/v1/" |
| 288 return service, nil | 297 return service, nil |
| 289 } | 298 } |
| 290 | 299 |
| 291 // checkBuildStatusLater schedules a delayed call to checkBuildStatus if the | 300 // checkBuildStatusLater schedules a delayed call to checkBuildStatus if the |
| 292 // invocation is still running. | 301 // invocation is still running. |
| 293 // | 302 // |
| 294 // This is a fallback mechanism in case PubSub notifications are delayed or | 303 // This is a fallback mechanism in case PubSub notifications are delayed or |
| 295 // lost for some reason. | 304 // lost for some reason. |
| 296 func (m TaskManager) checkBuildStatusLater(c context.Context, ctl task.Controlle
r) { | 305 func (m TaskManager) checkBuildStatusLater(c context.Context, ctl task.Controlle
r) { |
| 297 // TODO(vadimsh): Make the check interval configurable? | 306 // TODO(vadimsh): Make the check interval configurable? |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 370 r.Id, r.Status, r.Result, r.FailureReason, r.CancelationReason) | 379 r.Id, r.Status, r.Result, r.FailureReason, r.CancelationReason) |
| 371 switch { | 380 switch { |
| 372 case r.Status == "SCHEDULED" || r.Status == "STARTED": | 381 case r.Status == "SCHEDULED" || r.Status == "STARTED": |
| 373 return // do nothing | 382 return // do nothing |
| 374 case r.Status == "COMPLETED" && r.Result == "SUCCESS": | 383 case r.Status == "COMPLETED" && r.Result == "SUCCESS": |
| 375 ctl.State().Status = task.StatusSucceeded | 384 ctl.State().Status = task.StatusSucceeded |
| 376 default: | 385 default: |
| 377 ctl.State().Status = task.StatusFailed | 386 ctl.State().Status = task.StatusFailed |
| 378 } | 387 } |
| 379 } | 388 } |
| OLD | NEW |