Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Side by Side Diff: scheduler/appengine/task/swarming/swarming.go

Issue 2991213003: scheduler: Make 'https://' optional in 'server' config field. (Closed)
Patch Set: Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 // Package swarming implements tasks that run Swarming jobs. 15 // Package swarming implements tasks that run Swarming jobs.
16 package swarming 16 package swarming
17 17
18 import ( 18 import (
19 "encoding/json" 19 "encoding/json"
20 "fmt" 20 "fmt"
21 "net/url" 21 "net/url"
22 "strings"
22 "time" 23 "time"
23 24
24 "github.com/golang/protobuf/proto" 25 "github.com/golang/protobuf/proto"
25 "golang.org/x/net/context" 26 "golang.org/x/net/context"
26 "google.golang.org/api/pubsub/v1" 27 "google.golang.org/api/pubsub/v1"
27 28
28 "github.com/luci/gae/service/info" 29 "github.com/luci/gae/service/info"
29 "github.com/luci/luci-go/common/api/swarming/swarming/v1" 30 "github.com/luci/luci-go/common/api/swarming/swarming/v1"
30 "github.com/luci/luci-go/common/errors" 31 "github.com/luci/luci-go/common/errors"
31 "github.com/luci/luci-go/common/retry/transient" 32 "github.com/luci/luci-go/common/retry/transient"
(...skipping 22 matching lines...) Expand all
54 return (*messages.SwarmingTask)(nil) 55 return (*messages.SwarmingTask)(nil)
55 } 56 }
56 57
57 // Traits is part of Manager interface. 58 // Traits is part of Manager interface.
58 func (m TaskManager) Traits() task.Traits { 59 func (m TaskManager) Traits() task.Traits {
59 return task.Traits{ 60 return task.Traits{
60 Multistage: true, // we use task.StatusRunning state 61 Multistage: true, // we use task.StatusRunning state
61 } 62 }
62 } 63 }
63 64
65 func normalizeServerURL(s string) string {
66 if strings.HasPrefix(s, "https://") || strings.HasPrefix(s, "http://") {
67 return s
68 }
69 return "https://" + s
70 }
71
64 // ValidateProtoMessage is part of Manager interface. 72 // ValidateProtoMessage is part of Manager interface.
65 func (m TaskManager) ValidateProtoMessage(msg proto.Message) error { 73 func (m TaskManager) ValidateProtoMessage(msg proto.Message) error {
66 cfg, ok := msg.(*messages.SwarmingTask) 74 cfg, ok := msg.(*messages.SwarmingTask)
67 if !ok { 75 if !ok {
68 return fmt.Errorf("wrong type %T, expecting *messages.SwarmingTa sk", msg) 76 return fmt.Errorf("wrong type %T, expecting *messages.SwarmingTa sk", msg)
69 } 77 }
70 if cfg == nil { 78 if cfg == nil {
71 return fmt.Errorf("expecting a non-empty SwarmingTask") 79 return fmt.Errorf("expecting a non-empty SwarmingTask")
72 } 80 }
73 81
74 // Validate 'server' field. 82 // Validate 'server' field.
75 if cfg.Server == "" { 83 if cfg.Server == "" {
76 return fmt.Errorf("field 'server' is required") 84 return fmt.Errorf("field 'server' is required")
77 } 85 }
78 » u, err := url.Parse(cfg.Server) 86 » u, err := url.Parse(normalizeServerURL(cfg.Server))
79 if err != nil { 87 if err != nil {
80 return fmt.Errorf("invalid URL %q: %s", cfg.Server, err) 88 return fmt.Errorf("invalid URL %q: %s", cfg.Server, err)
81 } 89 }
82 if !u.IsAbs() { 90 if !u.IsAbs() {
83 return fmt.Errorf("not an absolute url: %q", cfg.Server) 91 return fmt.Errorf("not an absolute url: %q", cfg.Server)
84 } 92 }
85 if u.Path != "" { 93 if u.Path != "" {
86 return fmt.Errorf("not a host root url: %q", cfg.Server) 94 return fmt.Errorf("not a host root url: %q", cfg.Server)
87 } 95 }
88 96
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
200 } 208 }
201 209
202 // The task priority (or niceness, lower is more aggressive). 210 // The task priority (or niceness, lower is more aggressive).
203 priority := cfg.Priority 211 priority := cfg.Priority
204 if priority == 0 { 212 if priority == 0 {
205 priority = 200 213 priority = 200
206 } 214 }
207 215
208 // Make sure Swarming can publish PubSub messages, grab token that would 216 // Make sure Swarming can publish PubSub messages, grab token that would
209 // identify this invocation when receiving PubSub notifications. 217 // identify this invocation when receiving PubSub notifications.
210 » ctl.DebugLog("Preparing PubSub topic for %q", cfg.Server) 218 » serverURL := normalizeServerURL(cfg.Server)
211 » topic, authToken, err := ctl.PrepareTopic(c, cfg.Server) 219 » ctl.DebugLog("Preparing PubSub topic for %q", serverURL)
220 » topic, authToken, err := ctl.PrepareTopic(c, serverURL)
212 if err != nil { 221 if err != nil {
213 ctl.DebugLog("Failed to prepare PubSub topic - %s", err) 222 ctl.DebugLog("Failed to prepare PubSub topic - %s", err)
214 return err 223 return err
215 } 224 }
216 ctl.DebugLog("PubSub topic is %q", topic) 225 ctl.DebugLog("PubSub topic is %q", topic)
217 226
218 // Prepare the request. 227 // Prepare the request.
219 request := swarming.SwarmingRpcsNewTaskRequest{ 228 request := swarming.SwarmingRpcsNewTaskRequest{
220 Name: fmt.Sprintf("scheduler:%s/%d", ctl.JobID(), ctl .InvocationID()), 229 Name: fmt.Sprintf("scheduler:%s/%d", ctl.JobID(), ctl .InvocationID()),
221 ExpirationSecs: expirationSecs, 230 ExpirationSecs: expirationSecs,
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 // notifications 288 // notifications
280 ctl.State().TaskData, err = json.Marshal(&taskData{ 289 ctl.State().TaskData, err = json.Marshal(&taskData{
281 SwarmingTaskID: resp.TaskId, 290 SwarmingTaskID: resp.TaskId,
282 }) 291 })
283 if err != nil { 292 if err != nil {
284 return err 293 return err
285 } 294 }
286 295
287 // Successfully launched. 296 // Successfully launched.
288 ctl.State().Status = task.StatusRunning 297 ctl.State().Status = task.StatusRunning
289 » ctl.State().ViewURL = fmt.Sprintf("%s/user/task/%s", cfg.Server, resp.Ta skId) 298 » ctl.State().ViewURL = fmt.Sprintf("%s/user/task/%s", serverURL, resp.Tas kId)
290 ctl.DebugLog("Task URL: %s", ctl.State().ViewURL) 299 ctl.DebugLog("Task URL: %s", ctl.State().ViewURL)
291 300
292 // Maybe the task was already finished? Can only happen when 'idempotent ' is 301 // Maybe the task was already finished? Can only happen when 'idempotent ' is
293 // set to true (which we don't do currently), but handle this case here for 302 // set to true (which we don't do currently), but handle this case here for
294 // completeness anyway. 303 // completeness anyway.
295 if resp.TaskResult != nil { 304 if resp.TaskResult != nil {
296 ctl.DebugLog("Task request was deduplicated") 305 ctl.DebugLog("Task request was deduplicated")
297 m.handleTaskResult(c, ctl, resp.TaskResult) 306 m.handleTaskResult(c, ctl, resp.TaskResult)
298 } 307 }
299 308
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
333 func (m TaskManager) createSwarmingService(c context.Context, ctl task.Controlle r) (*swarming.Service, error) { 342 func (m TaskManager) createSwarmingService(c context.Context, ctl task.Controlle r) (*swarming.Service, error) {
334 client, err := ctl.GetClient(c, time.Minute) 343 client, err := ctl.GetClient(c, time.Minute)
335 if err != nil { 344 if err != nil {
336 return nil, err 345 return nil, err
337 } 346 }
338 service, err := swarming.New(client) 347 service, err := swarming.New(client)
339 if err != nil { 348 if err != nil {
340 return nil, err 349 return nil, err
341 } 350 }
342 cfg := ctl.Task().(*messages.SwarmingTask) 351 cfg := ctl.Task().(*messages.SwarmingTask)
343 » service.BasePath = cfg.Server + "/_ah/api/swarming/v1/" 352 » service.BasePath = normalizeServerURL(cfg.Server) + "/_ah/api/swarming/v 1/"
344 return service, nil 353 return service, nil
345 } 354 }
346 355
347 // checkTaskStatusLater schedules a delayed call to checkTaskStatus if the 356 // checkTaskStatusLater schedules a delayed call to checkTaskStatus if the
348 // invocation is still running. 357 // invocation is still running.
349 // 358 //
350 // This is a fallback mechanism in case PubSub notifications are delayed or 359 // This is a fallback mechanism in case PubSub notifications are delayed or
351 // lost for some reason. 360 // lost for some reason.
352 func (m TaskManager) checkTaskStatusLater(c context.Context, ctl task.Controller ) { 361 func (m TaskManager) checkTaskStatusLater(c context.Context, ctl task.Controller ) {
353 // TODO(vadimsh): Make the check interval configurable? 362 // TODO(vadimsh): Make the check interval configurable?
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
416 r.State, r.Failure, r.InternalFailure) 425 r.State, r.Failure, r.InternalFailure)
417 switch { 426 switch {
418 case r.State == "PENDING" || r.State == "RUNNING": 427 case r.State == "PENDING" || r.State == "RUNNING":
419 return // do nothing 428 return // do nothing
420 case r.State == "COMPLETED" && !(r.Failure || r.InternalFailure): 429 case r.State == "COMPLETED" && !(r.Failure || r.InternalFailure):
421 ctl.State().Status = task.StatusSucceeded 430 ctl.State().Status = task.StatusSucceeded
422 default: 431 default:
423 ctl.State().Status = task.StatusFailed 432 ctl.State().Status = task.StatusFailed
424 } 433 }
425 } 434 }
OLDNEW
« no previous file with comments | « scheduler/appengine/task/buildbucket/buildbucket_test.go ('k') | scheduler/appengine/task/swarming/swarming_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698