| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 // Package swarming implements tasks that run Swarming jobs. | 15 // Package swarming implements tasks that run Swarming jobs. |
| 16 package swarming | 16 package swarming |
| 17 | 17 |
| 18 import ( | 18 import ( |
| 19 "encoding/json" | 19 "encoding/json" |
| 20 "fmt" | 20 "fmt" |
| 21 "net/url" | 21 "net/url" |
| 22 "strings" |
| 22 "time" | 23 "time" |
| 23 | 24 |
| 24 "github.com/golang/protobuf/proto" | 25 "github.com/golang/protobuf/proto" |
| 25 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 26 "google.golang.org/api/pubsub/v1" | 27 "google.golang.org/api/pubsub/v1" |
| 27 | 28 |
| 28 "github.com/luci/gae/service/info" | 29 "github.com/luci/gae/service/info" |
| 29 "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 30 "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 30 "github.com/luci/luci-go/common/errors" | 31 "github.com/luci/luci-go/common/errors" |
| 31 "github.com/luci/luci-go/common/retry/transient" | 32 "github.com/luci/luci-go/common/retry/transient" |
| (...skipping 22 matching lines...) Expand all Loading... |
| 54 return (*messages.SwarmingTask)(nil) | 55 return (*messages.SwarmingTask)(nil) |
| 55 } | 56 } |
| 56 | 57 |
| 57 // Traits is part of Manager interface. | 58 // Traits is part of Manager interface. |
| 58 func (m TaskManager) Traits() task.Traits { | 59 func (m TaskManager) Traits() task.Traits { |
| 59 return task.Traits{ | 60 return task.Traits{ |
| 60 Multistage: true, // we use task.StatusRunning state | 61 Multistage: true, // we use task.StatusRunning state |
| 61 } | 62 } |
| 62 } | 63 } |
| 63 | 64 |
| 65 func normalizeServerURL(s string) string { |
| 66 if strings.HasPrefix(s, "https://") || strings.HasPrefix(s, "http://") { |
| 67 return s |
| 68 } |
| 69 return "https://" + s |
| 70 } |
| 71 |
| 64 // ValidateProtoMessage is part of Manager interface. | 72 // ValidateProtoMessage is part of Manager interface. |
| 65 func (m TaskManager) ValidateProtoMessage(msg proto.Message) error { | 73 func (m TaskManager) ValidateProtoMessage(msg proto.Message) error { |
| 66 cfg, ok := msg.(*messages.SwarmingTask) | 74 cfg, ok := msg.(*messages.SwarmingTask) |
| 67 if !ok { | 75 if !ok { |
| 68 return fmt.Errorf("wrong type %T, expecting *messages.SwarmingTa
sk", msg) | 76 return fmt.Errorf("wrong type %T, expecting *messages.SwarmingTa
sk", msg) |
| 69 } | 77 } |
| 70 if cfg == nil { | 78 if cfg == nil { |
| 71 return fmt.Errorf("expecting a non-empty SwarmingTask") | 79 return fmt.Errorf("expecting a non-empty SwarmingTask") |
| 72 } | 80 } |
| 73 | 81 |
| 74 // Validate 'server' field. | 82 // Validate 'server' field. |
| 75 if cfg.Server == "" { | 83 if cfg.Server == "" { |
| 76 return fmt.Errorf("field 'server' is required") | 84 return fmt.Errorf("field 'server' is required") |
| 77 } | 85 } |
| 78 » u, err := url.Parse(cfg.Server) | 86 » u, err := url.Parse(normalizeServerURL(cfg.Server)) |
| 79 if err != nil { | 87 if err != nil { |
| 80 return fmt.Errorf("invalid URL %q: %s", cfg.Server, err) | 88 return fmt.Errorf("invalid URL %q: %s", cfg.Server, err) |
| 81 } | 89 } |
| 82 if !u.IsAbs() { | 90 if !u.IsAbs() { |
| 83 return fmt.Errorf("not an absolute url: %q", cfg.Server) | 91 return fmt.Errorf("not an absolute url: %q", cfg.Server) |
| 84 } | 92 } |
| 85 if u.Path != "" { | 93 if u.Path != "" { |
| 86 return fmt.Errorf("not a host root url: %q", cfg.Server) | 94 return fmt.Errorf("not a host root url: %q", cfg.Server) |
| 87 } | 95 } |
| 88 | 96 |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 200 } | 208 } |
| 201 | 209 |
| 202 // The task priority (or niceness, lower is more aggressive). | 210 // The task priority (or niceness, lower is more aggressive). |
| 203 priority := cfg.Priority | 211 priority := cfg.Priority |
| 204 if priority == 0 { | 212 if priority == 0 { |
| 205 priority = 200 | 213 priority = 200 |
| 206 } | 214 } |
| 207 | 215 |
| 208 // Make sure Swarming can publish PubSub messages, grab token that would | 216 // Make sure Swarming can publish PubSub messages, grab token that would |
| 209 // identify this invocation when receiving PubSub notifications. | 217 // identify this invocation when receiving PubSub notifications. |
| 210 » ctl.DebugLog("Preparing PubSub topic for %q", cfg.Server) | 218 » serverURL := normalizeServerURL(cfg.Server) |
| 211 » topic, authToken, err := ctl.PrepareTopic(c, cfg.Server) | 219 » ctl.DebugLog("Preparing PubSub topic for %q", serverURL) |
| 220 » topic, authToken, err := ctl.PrepareTopic(c, serverURL) |
| 212 if err != nil { | 221 if err != nil { |
| 213 ctl.DebugLog("Failed to prepare PubSub topic - %s", err) | 222 ctl.DebugLog("Failed to prepare PubSub topic - %s", err) |
| 214 return err | 223 return err |
| 215 } | 224 } |
| 216 ctl.DebugLog("PubSub topic is %q", topic) | 225 ctl.DebugLog("PubSub topic is %q", topic) |
| 217 | 226 |
| 218 // Prepare the request. | 227 // Prepare the request. |
| 219 request := swarming.SwarmingRpcsNewTaskRequest{ | 228 request := swarming.SwarmingRpcsNewTaskRequest{ |
| 220 Name: fmt.Sprintf("scheduler:%s/%d", ctl.JobID(), ctl
.InvocationID()), | 229 Name: fmt.Sprintf("scheduler:%s/%d", ctl.JobID(), ctl
.InvocationID()), |
| 221 ExpirationSecs: expirationSecs, | 230 ExpirationSecs: expirationSecs, |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 279 // notifications | 288 // notifications |
| 280 ctl.State().TaskData, err = json.Marshal(&taskData{ | 289 ctl.State().TaskData, err = json.Marshal(&taskData{ |
| 281 SwarmingTaskID: resp.TaskId, | 290 SwarmingTaskID: resp.TaskId, |
| 282 }) | 291 }) |
| 283 if err != nil { | 292 if err != nil { |
| 284 return err | 293 return err |
| 285 } | 294 } |
| 286 | 295 |
| 287 // Successfully launched. | 296 // Successfully launched. |
| 288 ctl.State().Status = task.StatusRunning | 297 ctl.State().Status = task.StatusRunning |
| 289 » ctl.State().ViewURL = fmt.Sprintf("%s/user/task/%s", cfg.Server, resp.Ta
skId) | 298 » ctl.State().ViewURL = fmt.Sprintf("%s/user/task/%s", serverURL, resp.Tas
kId) |
| 290 ctl.DebugLog("Task URL: %s", ctl.State().ViewURL) | 299 ctl.DebugLog("Task URL: %s", ctl.State().ViewURL) |
| 291 | 300 |
| 292 // Maybe the task was already finished? Can only happen when 'idempotent
' is | 301 // Maybe the task was already finished? Can only happen when 'idempotent
' is |
| 293 // set to true (which we don't do currently), but handle this case here
for | 302 // set to true (which we don't do currently), but handle this case here
for |
| 294 // completeness anyway. | 303 // completeness anyway. |
| 295 if resp.TaskResult != nil { | 304 if resp.TaskResult != nil { |
| 296 ctl.DebugLog("Task request was deduplicated") | 305 ctl.DebugLog("Task request was deduplicated") |
| 297 m.handleTaskResult(c, ctl, resp.TaskResult) | 306 m.handleTaskResult(c, ctl, resp.TaskResult) |
| 298 } | 307 } |
| 299 | 308 |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 333 func (m TaskManager) createSwarmingService(c context.Context, ctl task.Controlle
r) (*swarming.Service, error) { | 342 func (m TaskManager) createSwarmingService(c context.Context, ctl task.Controlle
r) (*swarming.Service, error) { |
| 334 client, err := ctl.GetClient(c, time.Minute) | 343 client, err := ctl.GetClient(c, time.Minute) |
| 335 if err != nil { | 344 if err != nil { |
| 336 return nil, err | 345 return nil, err |
| 337 } | 346 } |
| 338 service, err := swarming.New(client) | 347 service, err := swarming.New(client) |
| 339 if err != nil { | 348 if err != nil { |
| 340 return nil, err | 349 return nil, err |
| 341 } | 350 } |
| 342 cfg := ctl.Task().(*messages.SwarmingTask) | 351 cfg := ctl.Task().(*messages.SwarmingTask) |
| 343 » service.BasePath = cfg.Server + "/_ah/api/swarming/v1/" | 352 » service.BasePath = normalizeServerURL(cfg.Server) + "/_ah/api/swarming/v
1/" |
| 344 return service, nil | 353 return service, nil |
| 345 } | 354 } |
| 346 | 355 |
| 347 // checkTaskStatusLater schedules a delayed call to checkTaskStatus if the | 356 // checkTaskStatusLater schedules a delayed call to checkTaskStatus if the |
| 348 // invocation is still running. | 357 // invocation is still running. |
| 349 // | 358 // |
| 350 // This is a fallback mechanism in case PubSub notifications are delayed or | 359 // This is a fallback mechanism in case PubSub notifications are delayed or |
| 351 // lost for some reason. | 360 // lost for some reason. |
| 352 func (m TaskManager) checkTaskStatusLater(c context.Context, ctl task.Controller
) { | 361 func (m TaskManager) checkTaskStatusLater(c context.Context, ctl task.Controller
) { |
| 353 // TODO(vadimsh): Make the check interval configurable? | 362 // TODO(vadimsh): Make the check interval configurable? |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 416 r.State, r.Failure, r.InternalFailure) | 425 r.State, r.Failure, r.InternalFailure) |
| 417 switch { | 426 switch { |
| 418 case r.State == "PENDING" || r.State == "RUNNING": | 427 case r.State == "PENDING" || r.State == "RUNNING": |
| 419 return // do nothing | 428 return // do nothing |
| 420 case r.State == "COMPLETED" && !(r.Failure || r.InternalFailure): | 429 case r.State == "COMPLETED" && !(r.Failure || r.InternalFailure): |
| 421 ctl.State().Status = task.StatusSucceeded | 430 ctl.State().Status = task.StatusSucceeded |
| 422 default: | 431 default: |
| 423 ctl.State().Status = task.StatusFailed | 432 ctl.State().Status = task.StatusFailed |
| 424 } | 433 } |
| 425 } | 434 } |
| OLD | NEW |