| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 "strings" | 12 "strings" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 | 16 |
| 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 17 swarming "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 18 "github.com/luci/luci-go/common/clock" |
| 18 "github.com/luci/luci-go/common/errors" | 19 "github.com/luci/luci-go/common/errors" |
| 19 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/proto/google" | 21 "github.com/luci/luci-go/common/proto/google" |
| 21 miloProto "github.com/luci/luci-go/common/proto/milo" | 22 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 22 "github.com/luci/luci-go/common/sync/parallel" | 23 "github.com/luci/luci-go/common/sync/parallel" |
| 23 "github.com/luci/luci-go/logdog/client/annotee" | 24 "github.com/luci/luci-go/logdog/client/annotee" |
| 24 "github.com/luci/luci-go/logdog/client/coordinator" | 25 "github.com/luci/luci-go/logdog/client/coordinator" |
| 25 "github.com/luci/luci-go/logdog/common/types" | 26 "github.com/luci/luci-go/logdog/common/types" |
| 26 "github.com/luci/luci-go/milo/api/resp" | 27 "github.com/luci/luci-go/milo/api/resp" |
| 27 "github.com/luci/luci-go/milo/appengine/logdog" | 28 "github.com/luci/luci-go/milo/appengine/logdog" |
| 28 "github.com/luci/luci-go/server/auth" | 29 "github.com/luci/luci-go/server/auth" |
| 29 ) | 30 ) |
| 30 | 31 |
| 31 // errNotMiloJob is returned if a Swarming task is fetched that does not self- | 32 // errNotMiloJob is returned if a Swarming task is fetched that does not self- |
| 32 // identify as a Milo job. | 33 // identify as a Milo job. |
| 33 var errNotMiloJob = errors.New("Not a Milo Job") | 34 var errNotMiloJob = errors.New("Not a Milo Job") |
| 34 | 35 |
| 35 // SwarmingTimeLayout is time layout used by swarming. | 36 // SwarmingTimeLayout is time layout used by swarming. |
| 36 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" | 37 const SwarmingTimeLayout = "2006-01-02T15:04:05.999999999" |
| 37 | 38 |
| 39 // logDogFetchTimeout is the amount of time to wait while fetching a LogDog |
| 40 // stream before we time out the fetch. |
| 41 const logDogFetchTimeout = 30 * time.Second |
| 42 |
| 38 // Swarming task states.. | 43 // Swarming task states.. |
| 39 const ( | 44 const ( |
| 40 // TaskRunning means task is running. | 45 // TaskRunning means task is running. |
| 41 TaskRunning = "RUNNING" | 46 TaskRunning = "RUNNING" |
| 42 // TaskPending means task didn't start yet. | 47 // TaskPending means task didn't start yet. |
| 43 TaskPending = "PENDING" | 48 TaskPending = "PENDING" |
| 44 // TaskExpired means task expired and did not start. | 49 // TaskExpired means task expired and did not start. |
| 45 TaskExpired = "EXPIRED" | 50 TaskExpired = "EXPIRED" |
| 46 // TaskTimedOut means task started, but took too long. | 51 // TaskTimedOut means task started, but took too long. |
| 47 TaskTimedOut = "TIMED_OUT" | 52 TaskTimedOut = "TIMED_OUT" |
| (...skipping 457 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 505 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService,
linkBase, taskID string) (*resp.MiloBuild, error) { | 510 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService,
linkBase, taskID string) (*resp.MiloBuild, error) { |
| 506 // Fetch the data from Swarming | 511 // Fetch the data from Swarming |
| 507 var logDogStreamAddr *types.StreamAddr | 512 var logDogStreamAddr *types.StreamAddr |
| 508 | 513 |
| 509 fetchParams := swarmingFetchParams{ | 514 fetchParams := swarmingFetchParams{ |
| 510 fetchRes: true, | 515 fetchRes: true, |
| 511 fetchLog: true, | 516 fetchLog: true, |
| 512 | 517 |
| 513 // Cancel if LogDog annotation stream parameters are present in
the tag set. | 518 // Cancel if LogDog annotation stream parameters are present in
the tag set. |
| 514 taskResCallback: func(res *swarming.SwarmingRpcsTaskResult) (can
celLogs bool) { | 519 taskResCallback: func(res *swarming.SwarmingRpcsTaskResult) (can
celLogs bool) { |
| 520 // If the build hasn't started yet, then there is no Log
Dog log stream to |
| 521 // render. |
| 522 switch res.State { |
| 523 case TaskPending, TaskExpired: |
| 524 return false |
| 525 |
| 526 case TaskCanceled: |
| 527 // If the task wasn't created, then it wasn't st
arted. |
| 528 if res.CreatedTs == "" { |
| 529 return false |
| 530 } |
| 531 } |
| 532 |
| 533 // The task started ... is it using LogDog for logging? |
| 515 tags := swarmingTags(res.Tags) | 534 tags := swarmingTags(res.Tags) |
| 516 | 535 |
| 517 var err error | 536 var err error |
| 518 if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa
gs(tags, res.TaskId, res.TryNumber); err != nil { | 537 if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa
gs(tags, res.TaskId, res.TryNumber); err != nil { |
| 519 logging.WithError(err).Debugf(c, "Not using LogD
og annotation stream.") | 538 logging.WithError(err).Debugf(c, "Not using LogD
og annotation stream.") |
| 520 return false | 539 return false |
| 521 } | 540 } |
| 522 return true | 541 return true |
| 523 }, | 542 }, |
| 524 } | 543 } |
| (...skipping 13 matching lines...) Expand all Loading... |
| 538 // As a fallback, we will try and parse the Swarming task's output for | 557 // As a fallback, we will try and parse the Swarming task's output for |
| 539 // annotations. | 558 // annotations. |
| 540 switch { | 559 switch { |
| 541 case logDogStreamAddr != nil: | 560 case logDogStreamAddr != nil: |
| 542 // If the LogDog stream is available, load the step from that. | 561 // If the LogDog stream is available, load the step from that. |
| 543 as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr) | 562 as, err := bl.newEmptyAnnotationStream(c, logDogStreamAddr) |
| 544 if err != nil { | 563 if err != nil { |
| 545 return nil, errors.Annotate(err).Reason("failed to creat
e LogDog annotation stream").Err() | 564 return nil, errors.Annotate(err).Reason("failed to creat
e LogDog annotation stream").Err() |
| 546 } | 565 } |
| 547 | 566 |
| 548 if s, err = as.Fetch(c); err != nil { | |
| 549 return nil, errors.Annotate(err).Reason("failed to load
LogDog annotation stream").Err() | |
| 550 } | |
| 551 | |
| 552 prefix, _ := logDogStreamAddr.Path.Split() | 567 prefix, _ := logDogStreamAddr.Path.Split() |
| 553 ub = &logdog.ViewerURLBuilder{ | 568 ub = &logdog.ViewerURLBuilder{ |
| 554 Host: logDogStreamAddr.Host, | 569 Host: logDogStreamAddr.Host, |
| 555 Prefix: prefix, | 570 Prefix: prefix, |
| 556 Project: logDogStreamAddr.Project, | 571 Project: logDogStreamAddr.Project, |
| 557 } | 572 } |
| 558 | 573 |
| 574 fetchCtx, cancelFunc := clock.WithTimeout(c, logDogFetchTimeout) |
| 575 defer cancelFunc() |
| 576 |
| 577 if s, err = as.Fetch(fetchCtx); err != nil { |
| 578 logging.Fields{ |
| 579 logging.ErrorKey: err, |
| 580 "addr": logDogStreamAddr, |
| 581 }.Errorf(c, "Failed to load LogDog annotation stream.") |
| 582 build.Components = append(build.Components, infraFailure
Component("LogDog load error", err)) |
| 583 break |
| 584 } |
| 585 |
| 559 case fr.log != "": | 586 case fr.log != "": |
| 560 // Decode the data using annotee. The logdog stream returned her
e is assumed | 587 // Decode the data using annotee. The logdog stream returned her
e is assumed |
| 561 // to be consistent, which is why the following block of code ar
e not | 588 // to be consistent, which is why the following block of code ar
e not |
| 562 // expected to ever err out. | 589 // expected to ever err out. |
| 563 var err error | 590 var err error |
| 564 lds, err = streamsFromAnnotatedLog(c, fr.log) | 591 lds, err = streamsFromAnnotatedLog(c, fr.log) |
| 565 if err != nil { | 592 if err != nil { |
| 566 » » » build.Components = []*resp.BuildComponent{{ | 593 » » » comp := infraFailureComponent("Milo annotation parser",
err) |
| 567 » » » » Type: resp.Summary, | 594 » » » comp.SubLink = append(comp.SubLink, &resp.Link{ |
| 568 » » » » Label: "Milo annotation parser", | 595 » » » » Label: "swarming task", |
| 569 » » » » Text: []string{err.Error()}, | 596 » » » » URL: taskPageURL(svc.getHost(), taskID), |
| 570 » » » » Status: resp.InfraFailure, | 597 » » » }) |
| 571 » » » » SubLink: []*resp.Link{{ | 598 » » » build.Components = append(build.Components, comp) |
| 572 » » » » » Label: "swarming task", | |
| 573 » » » » » URL: taskPageURL(svc.getHost(), taskID
), | |
| 574 » » » » }}, | |
| 575 » » » }} | |
| 576 } | 599 } |
| 577 | 600 |
| 578 if lds != nil && lds.MainStream != nil && lds.MainStream.Data !=
nil { | 601 if lds != nil && lds.MainStream != nil && lds.MainStream.Data !=
nil { |
| 579 s = lds.MainStream.Data | 602 s = lds.MainStream.Data |
| 580 } | 603 } |
| 581 ub = swarmingURLBuilder(linkBase) | 604 ub = swarmingURLBuilder(linkBase) |
| 582 | 605 |
| 583 default: | 606 default: |
| 584 s = &miloProto.Step{} | 607 s = &miloProto.Step{} |
| 585 ub = swarmingURLBuilder(linkBase) | 608 ub = swarmingURLBuilder(linkBase) |
| 586 } | 609 } |
| 587 | 610 |
| 588 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { | 611 if err := addTaskToMiloStep(c, svc.getHost(), fr.res, s); err != nil { |
| 589 return nil, err | 612 return nil, err |
| 590 } | 613 } |
| 591 logdog.AddLogDogToBuild(c, ub, s, &build) | 614 logdog.AddLogDogToBuild(c, ub, s, &build) |
| 592 | 615 |
| 593 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { | 616 if err := addTaskToBuild(c, svc.getHost(), fr.res, &build); err != nil { |
| 594 return nil, err | 617 return nil, err |
| 595 } | 618 } |
| 596 | 619 |
| 597 return &build, nil | 620 return &build, nil |
| 598 } | 621 } |
| 599 | 622 |
| 623 func infraFailureComponent(label string, err error) *resp.BuildComponent { |
| 624 return &resp.BuildComponent{ |
| 625 Type: resp.Summary, |
| 626 Label: label, |
| 627 Text: []string{err.Error()}, |
| 628 Status: resp.InfraFailure, |
| 629 } |
| 630 } |
| 631 |
| 600 func isMiloJob(tags []string) bool { | 632 func isMiloJob(tags []string) bool { |
| 601 for _, t := range tags { | 633 for _, t := range tags { |
| 602 if t == "allow_milo:1" { | 634 if t == "allow_milo:1" { |
| 603 return true | 635 return true |
| 604 } | 636 } |
| 605 } | 637 } |
| 606 return false | 638 return false |
| 607 } | 639 } |
| 608 | 640 |
| 609 // taskPageURL returns a URL to a human-consumable page of a swarming task. | 641 // taskPageURL returns a URL to a human-consumable page of a swarming task. |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 665 for _, tag := range v { | 697 for _, tag := range v { |
| 666 var value string | 698 var value string |
| 667 parts := strings.SplitN(tag, ":", 2) | 699 parts := strings.SplitN(tag, ":", 2) |
| 668 if len(parts) == 2 { | 700 if len(parts) == 2 { |
| 669 value = parts[1] | 701 value = parts[1] |
| 670 } | 702 } |
| 671 res[parts[0]] = value | 703 res[parts[0]] = value |
| 672 } | 704 } |
| 673 return res | 705 return res |
| 674 } | 706 } |
| OLD | NEW |