OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 // Package engine implements the core logic of the scheduler service. | 5 // Package engine implements the core logic of the scheduler service. |
6 package engine | 6 package engine |
7 | 7 |
8 import ( | 8 import ( |
9 "bytes" | 9 "bytes" |
10 "encoding/json" | 10 "encoding/json" |
(...skipping 14 matching lines...) Expand all Loading... |
25 ds "github.com/luci/gae/service/datastore" | 25 ds "github.com/luci/gae/service/datastore" |
26 "github.com/luci/gae/service/info" | 26 "github.com/luci/gae/service/info" |
27 mc "github.com/luci/gae/service/memcache" | 27 mc "github.com/luci/gae/service/memcache" |
28 tq "github.com/luci/gae/service/taskqueue" | 28 tq "github.com/luci/gae/service/taskqueue" |
29 | 29 |
30 "github.com/luci/luci-go/common/clock" | 30 "github.com/luci/luci-go/common/clock" |
31 "github.com/luci/luci-go/common/data/rand/mathrand" | 31 "github.com/luci/luci-go/common/data/rand/mathrand" |
32 "github.com/luci/luci-go/common/data/stringset" | 32 "github.com/luci/luci-go/common/data/stringset" |
33 "github.com/luci/luci-go/common/errors" | 33 "github.com/luci/luci-go/common/errors" |
34 "github.com/luci/luci-go/common/logging" | 34 "github.com/luci/luci-go/common/logging" |
| 35 "github.com/luci/luci-go/common/retry/transient" |
35 "github.com/luci/luci-go/server/auth" | 36 "github.com/luci/luci-go/server/auth" |
36 "github.com/luci/luci-go/server/auth/identity" | 37 "github.com/luci/luci-go/server/auth/identity" |
37 "github.com/luci/luci-go/server/auth/signing" | 38 "github.com/luci/luci-go/server/auth/signing" |
38 "github.com/luci/luci-go/server/tokens" | 39 "github.com/luci/luci-go/server/tokens" |
39 | 40 |
40 "github.com/luci/luci-go/scheduler/appengine/catalog" | 41 "github.com/luci/luci-go/scheduler/appengine/catalog" |
41 "github.com/luci/luci-go/scheduler/appengine/schedule" | 42 "github.com/luci/luci-go/scheduler/appengine/schedule" |
42 "github.com/luci/luci-go/scheduler/appengine/task" | 43 "github.com/luci/luci-go/scheduler/appengine/task" |
43 ) | 44 ) |
44 | 45 |
(...skipping 509 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
554 // Check the global cache. | 555 // Check the global cache. |
555 switch _, err := mc.GetKey(c, key); { | 556 switch _, err := mc.GetKey(c, key); { |
556 case err == nil: | 557 case err == nil: |
557 e.lock.Lock() | 558 e.lock.Lock() |
558 defer e.lock.Unlock() | 559 defer e.lock.Unlock() |
559 e.doneFlags[key] = true | 560 e.doneFlags[key] = true |
560 return nil | 561 return nil |
561 case err == mc.ErrCacheMiss: | 562 case err == mc.ErrCacheMiss: |
562 break | 563 break |
563 default: | 564 default: |
564 » » return errors.WrapTransient(err) | 565 » » return transient.Tag.Apply(err) |
565 } | 566 } |
566 | 567 |
567 // Do it. | 568 // Do it. |
568 if err := cb(); err != nil { | 569 if err := cb(); err != nil { |
569 return err | 570 return err |
570 } | 571 } |
571 | 572 |
572 // Store in the global cache. Ignore errors, it's not a big deal. | 573 // Store in the global cache. Ignore errors, it's not a big deal. |
573 item := mc.NewItem(c, key) | 574 item := mc.NewItem(c, key) |
574 item.SetValue([]byte("ok")) | 575 item.SetValue([]byte("ok")) |
575 item.SetExpiration(24 * time.Hour) | 576 item.SetExpiration(24 * time.Hour) |
576 if err := mc.Set(c, item); err != nil { | 577 if err := mc.Set(c, item); err != nil { |
577 logging.Warningf(c, "Failed to write item to memcache - %s", err
) | 578 logging.Warningf(c, "Failed to write item to memcache - %s", err
) |
578 } | 579 } |
579 | 580 |
580 // Store in the local cache. | 581 // Store in the local cache. |
581 e.lock.Lock() | 582 e.lock.Lock() |
582 defer e.lock.Unlock() | 583 defer e.lock.Unlock() |
583 e.doneFlags[key] = true | 584 e.doneFlags[key] = true |
584 return nil | 585 return nil |
585 } | 586 } |
586 | 587 |
587 func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { | 588 func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { |
588 q := ds.NewQuery("Job"). | 589 q := ds.NewQuery("Job"). |
589 Eq("Enabled", true). | 590 Eq("Enabled", true). |
590 Project("ProjectID"). | 591 Project("ProjectID"). |
591 Distinct(true) | 592 Distinct(true) |
592 entities := []Job{} | 593 entities := []Job{} |
593 if err := ds.GetAll(c, q, &entities); err != nil { | 594 if err := ds.GetAll(c, q, &entities); err != nil { |
594 » » return nil, errors.WrapTransient(err) | 595 » » return nil, transient.Tag.Apply(err) |
595 } | 596 } |
596 // Filter out duplicates, sort. | 597 // Filter out duplicates, sort. |
597 projects := stringset.New(len(entities)) | 598 projects := stringset.New(len(entities)) |
598 for _, ent := range entities { | 599 for _, ent := range entities { |
599 projects.Add(ent.ProjectID) | 600 projects.Add(ent.ProjectID) |
600 } | 601 } |
601 out := projects.ToSlice() | 602 out := projects.ToSlice() |
602 sort.Strings(out) | 603 sort.Strings(out) |
603 return out, nil | 604 return out, nil |
604 } | 605 } |
605 | 606 |
606 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { | 607 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { |
607 q := ds.NewQuery("Job").Eq("Enabled", true) | 608 q := ds.NewQuery("Job").Eq("Enabled", true) |
608 return e.queryEnabledJobs(c, q) | 609 return e.queryEnabledJobs(c, q) |
609 } | 610 } |
610 | 611 |
611 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
, error) { | 612 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
, error) { |
612 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) | 613 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) |
613 return e.queryEnabledJobs(c, q) | 614 return e.queryEnabledJobs(c, q) |
614 } | 615 } |
615 | 616 |
616 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
rror) { | 617 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
rror) { |
617 entities := []*Job{} | 618 entities := []*Job{} |
618 if err := ds.GetAll(c, q, &entities); err != nil { | 619 if err := ds.GetAll(c, q, &entities); err != nil { |
619 » » return nil, errors.WrapTransient(err) | 620 » » return nil, transient.Tag.Apply(err) |
620 } | 621 } |
621 // Non-ancestor query used, need to recheck filters. | 622 // Non-ancestor query used, need to recheck filters. |
622 filtered := make([]*Job, 0, len(entities)) | 623 filtered := make([]*Job, 0, len(entities)) |
623 for _, job := range entities { | 624 for _, job := range entities { |
624 if job.Enabled { | 625 if job.Enabled { |
625 filtered = append(filtered, job) | 626 filtered = append(filtered, job) |
626 } | 627 } |
627 } | 628 } |
628 return filtered, nil | 629 return filtered, nil |
629 } | 630 } |
630 | 631 |
631 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { | 632 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { |
632 job := &Job{JobID: jobID} | 633 job := &Job{JobID: jobID} |
633 switch err := ds.Get(c, job); { | 634 switch err := ds.Get(c, job); { |
634 case err == nil: | 635 case err == nil: |
635 return job, nil | 636 return job, nil |
636 case err == ds.ErrNoSuchEntity: | 637 case err == ds.ErrNoSuchEntity: |
637 return nil, nil | 638 return nil, nil |
638 default: | 639 default: |
639 » » return nil, errors.WrapTransient(err) | 640 » » return nil, transient.Tag.Apply(err) |
640 } | 641 } |
641 } | 642 } |
642 | 643 |
643 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
nt, cursor string) ([]*Invocation, string, error) { | 644 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
nt, cursor string) ([]*Invocation, string, error) { |
644 if pageSize == 0 || pageSize > 500 { | 645 if pageSize == 0 || pageSize > 500 { |
645 pageSize = 500 | 646 pageSize = 500 |
646 } | 647 } |
647 | 648 |
648 // Deserialize the cursor. | 649 // Deserialize the cursor. |
649 var cursorObj ds.Cursor | 650 var cursorObj ds.Cursor |
(...skipping 23 matching lines...) Expand all Loading... |
673 return nil | 674 return nil |
674 } | 675 } |
675 c, err := getCursor() | 676 c, err := getCursor() |
676 if err != nil { | 677 if err != nil { |
677 return err | 678 return err |
678 } | 679 } |
679 newCursor = c.String() | 680 newCursor = c.String() |
680 return ds.Stop | 681 return ds.Stop |
681 }) | 682 }) |
682 if err != nil { | 683 if err != nil { |
683 » » return nil, "", errors.WrapTransient(err) | 684 » » return nil, "", transient.Tag.Apply(err) |
684 } | 685 } |
685 return out, newCursor, nil | 686 return out, newCursor, nil |
686 } | 687 } |
687 | 688 |
688 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
(*Invocation, error) { | 689 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
(*Invocation, error) { |
689 inv := &Invocation{ | 690 inv := &Invocation{ |
690 ID: invID, | 691 ID: invID, |
691 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), | 692 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), |
692 } | 693 } |
693 switch err := ds.Get(c, inv); { | 694 switch err := ds.Get(c, inv); { |
694 case err == nil: | 695 case err == nil: |
695 return inv, nil | 696 return inv, nil |
696 case err == ds.ErrNoSuchEntity: | 697 case err == ds.ErrNoSuchEntity: |
697 return nil, nil | 698 return nil, nil |
698 default: | 699 default: |
699 » » return nil, errors.WrapTransient(err) | 700 » » return nil, transient.Tag.Apply(err) |
700 } | 701 } |
701 } | 702 } |
702 | 703 |
703 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
]*Invocation, error) { | 704 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
]*Invocation, error) { |
704 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) | 705 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) |
705 entities := []*Invocation{} | 706 entities := []*Invocation{} |
706 if err := ds.GetAll(c, q, &entities); err != nil { | 707 if err := ds.GetAll(c, q, &entities); err != nil { |
707 » » return nil, errors.WrapTransient(err) | 708 » » return nil, transient.Tag.Apply(err) |
708 } | 709 } |
709 return entities, nil | 710 return entities, nil |
710 } | 711 } |
711 | 712 |
712 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
[]catalog.Definition) error { | 713 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
[]catalog.Definition) error { |
713 // JobID -> *Job map. | 714 // JobID -> *Job map. |
714 existing, err := e.getProjectJobs(c, projectID) | 715 existing, err := e.getProjectJobs(c, projectID) |
715 if err != nil { | 716 if err != nil { |
716 return err | 717 return err |
717 } | 718 } |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
752 go func(i int, jobID string) { | 753 go func(i int, jobID string) { |
753 disableErrs.Assign(i, e.disableJob(c, jobID)) | 754 disableErrs.Assign(i, e.disableJob(c, jobID)) |
754 wg.Done() | 755 wg.Done() |
755 }(i, jobID) | 756 }(i, jobID) |
756 } | 757 } |
757 | 758 |
758 wg.Wait() | 759 wg.Wait() |
759 if updateErrs.Get() == nil && disableErrs.Get() == nil { | 760 if updateErrs.Get() == nil && disableErrs.Get() == nil { |
760 return nil | 761 return nil |
761 } | 762 } |
762 » return errors.WrapTransient(errors.NewMultiError(updateErrs.Get(), disab
leErrs.Get())) | 763 » return transient.Tag.Apply(errors.NewMultiError(updateErrs.Get(), disabl
eErrs.Get())) |
763 } | 764 } |
764 | 765 |
765 func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { | 766 func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { |
766 if !info.IsDevAppServer(c) { | 767 if !info.IsDevAppServer(c) { |
767 return errors.New("ResetAllJobsOnDevServer must not be used in p
roduction") | 768 return errors.New("ResetAllJobsOnDevServer must not be used in p
roduction") |
768 } | 769 } |
769 q := ds.NewQuery("Job").Eq("Enabled", true) | 770 q := ds.NewQuery("Job").Eq("Enabled", true) |
770 keys := []*ds.Key{} | 771 keys := []*ds.Key{} |
771 if err := ds.GetAll(c, q, &keys); err != nil { | 772 if err := ds.GetAll(c, q, &keys); err != nil { |
772 » » return errors.WrapTransient(err) | 773 » » return transient.Tag.Apply(err) |
773 } | 774 } |
774 wg := sync.WaitGroup{} | 775 wg := sync.WaitGroup{} |
775 errs := errors.NewLazyMultiError(len(keys)) | 776 errs := errors.NewLazyMultiError(len(keys)) |
776 for i, key := range keys { | 777 for i, key := range keys { |
777 wg.Add(1) | 778 wg.Add(1) |
778 go func(i int, key *ds.Key) { | 779 go func(i int, key *ds.Key) { |
779 errs.Assign(i, e.resetJobOnDevServer(c, key.StringID())) | 780 errs.Assign(i, e.resetJobOnDevServer(c, key.StringID())) |
780 wg.Done() | 781 wg.Done() |
781 }(i, key) | 782 }(i, key) |
782 } | 783 } |
783 wg.Wait() | 784 wg.Wait() |
784 » return errors.WrapTransient(errs.Get()) | 785 » return transient.Tag.Apply(errs.Get()) |
785 } | 786 } |
786 | 787 |
787 // getProjectJobs fetches from ds all enabled jobs belonging to a given | 788 // getProjectJobs fetches from ds all enabled jobs belonging to a given |
788 // project. | 789 // project. |
789 func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st
ring]*Job, error) { | 790 func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st
ring]*Job, error) { |
790 q := ds.NewQuery("Job"). | 791 q := ds.NewQuery("Job"). |
791 Eq("Enabled", true). | 792 Eq("Enabled", true). |
792 Eq("ProjectID", projectID) | 793 Eq("ProjectID", projectID) |
793 entities := []*Job{} | 794 entities := []*Job{} |
794 if err := ds.GetAll(c, q, &entities); err != nil { | 795 if err := ds.GetAll(c, q, &entities); err != nil { |
795 » » return nil, errors.WrapTransient(err) | 796 » » return nil, transient.Tag.Apply(err) |
796 } | 797 } |
797 out := make(map[string]*Job, len(entities)) | 798 out := make(map[string]*Job, len(entities)) |
798 for _, job := range entities { | 799 for _, job := range entities { |
799 if job.Enabled && job.ProjectID == projectID { | 800 if job.Enabled && job.ProjectID == projectID { |
800 out[job.JobID] = job | 801 out[job.JobID] = job |
801 } | 802 } |
802 } | 803 } |
803 return out, nil | 804 return out, nil |
804 } | 805 } |
805 | 806 |
(...skipping 25 matching lines...) Expand all Loading... |
831 logging.Warningf(c, "Retrying transaction...") | 832 logging.Warningf(c, "Retrying transaction...") |
832 } | 833 } |
833 stored := Job{JobID: jobID} | 834 stored := Job{JobID: jobID} |
834 err := ds.Get(c, &stored) | 835 err := ds.Get(c, &stored) |
835 if err != nil && err != ds.ErrNoSuchEntity { | 836 if err != nil && err != ds.ErrNoSuchEntity { |
836 return err | 837 return err |
837 } | 838 } |
838 modified := stored | 839 modified := stored |
839 err = txn(c, &modified, err == ds.ErrNoSuchEntity) | 840 err = txn(c, &modified, err == ds.ErrNoSuchEntity) |
840 if err != nil && err != errSkipPut { | 841 if err != nil && err != errSkipPut { |
841 » » » fatal = !errors.IsTransient(err) | 842 » » » fatal = !transient.Tag.In(err) |
842 return err | 843 return err |
843 } | 844 } |
844 if err != errSkipPut && !modified.isEqual(&stored) { | 845 if err != errSkipPut && !modified.isEqual(&stored) { |
845 return ds.Put(c, &modified) | 846 return ds.Put(c, &modified) |
846 } | 847 } |
847 return nil | 848 return nil |
848 }, &defaultTransactionOptions) | 849 }, &defaultTransactionOptions) |
849 if err != nil { | 850 if err != nil { |
850 logging.Errorf(c, "Job transaction failed: %s", err) | 851 logging.Errorf(c, "Job transaction failed: %s", err) |
851 if fatal { | 852 if fatal { |
852 return err | 853 return err |
853 } | 854 } |
854 // By now err is already transient (since 'fatal' is false) or i
t is commit | 855 // By now err is already transient (since 'fatal' is false) or i
t is commit |
855 // error (i.e. produced by RunInTransaction itself, not by its c
allback). | 856 // error (i.e. produced by RunInTransaction itself, not by its c
allback). |
856 // Need to wrap commit errors too. | 857 // Need to wrap commit errors too. |
857 » » return errors.WrapTransient(err) | 858 » » return transient.Tag.Apply(err) |
858 } | 859 } |
859 if attempt > 1 { | 860 if attempt > 1 { |
860 logging.Infof(c, "Committed on %d attempt", attempt) | 861 logging.Infof(c, "Committed on %d attempt", attempt) |
861 } | 862 } |
862 return nil | 863 return nil |
863 } | 864 } |
864 | 865 |
865 // rollSM is called under transaction to perform a single state machine | 866 // rollSM is called under transaction to perform a single state machine |
866 // transition. It sets up StateMachine instance, calls the callback, mutates | 867 // transition. It sets up StateMachine instance, calls the callback, mutates |
867 // job.State in place (with a new state) and enqueues all emitted actions to | 868 // job.State in place (with a new state) and enqueues all emitted actions to |
868 // task queues. | 869 // task queues. |
869 func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine)
error) error { | 870 func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine)
error) error { |
870 sched, err := job.parseSchedule() | 871 sched, err := job.parseSchedule() |
871 if err != nil { | 872 if err != nil { |
872 return fmt.Errorf("bad schedule %q - %s", job.effectiveSchedule(
), err) | 873 return fmt.Errorf("bad schedule %q - %s", job.effectiveSchedule(
), err) |
873 } | 874 } |
874 now := clock.Now(c).UTC() | 875 now := clock.Now(c).UTC() |
875 rnd := mathrand.Get(c) | 876 rnd := mathrand.Get(c) |
876 sm := StateMachine{ | 877 sm := StateMachine{ |
877 State: job.State, | 878 State: job.State, |
878 Now: now, | 879 Now: now, |
879 Schedule: sched, | 880 Schedule: sched, |
880 Nonce: func() int64 { return rnd.Int63() + 1 }, | 881 Nonce: func() int64 { return rnd.Int63() + 1 }, |
881 Context: c, | 882 Context: c, |
882 } | 883 } |
883 // All errors returned by state machine transition changes are transient
. | 884 // All errors returned by state machine transition changes are transient
. |
884 // Fatal errors (when we have them) should be reflected as a state chang
ing | 885 // Fatal errors (when we have them) should be reflected as a state chang
ing |
885 // into "BROKEN" state. | 886 // into "BROKEN" state. |
886 if err := cb(&sm); err != nil { | 887 if err := cb(&sm); err != nil { |
887 » » return errors.WrapTransient(err) | 888 » » return transient.Tag.Apply(err) |
888 } | 889 } |
889 if len(sm.Actions) != 0 { | 890 if len(sm.Actions) != 0 { |
890 if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err !=
nil { | 891 if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err !=
nil { |
891 return err | 892 return err |
892 } | 893 } |
893 } | 894 } |
894 if sm.State.State != job.State.State { | 895 if sm.State.State != job.State.State { |
895 logging.Infof(c, "%s -> %s", job.State.State, sm.State.State) | 896 logging.Infof(c, "%s -> %s", job.State.State, sm.State.State) |
896 } | 897 } |
897 job.State = sm.State | 898 job.State = sm.State |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
967 i := 0 | 968 i := 0 |
968 for queueName, tasks := range qs { | 969 for queueName, tasks := range qs { |
969 wg.Add(1) | 970 wg.Add(1) |
970 go func(i int, queueName string, tasks []*tq.Task) { | 971 go func(i int, queueName string, tasks []*tq.Task) { |
971 errs.Assign(i, tq.Add(c, queueName, tasks...)) | 972 errs.Assign(i, tq.Add(c, queueName, tasks...)) |
972 wg.Done() | 973 wg.Done() |
973 }(i, queueName, tasks) | 974 }(i, queueName, tasks) |
974 i++ | 975 i++ |
975 } | 976 } |
976 wg.Wait() | 977 wg.Wait() |
977 » return errors.WrapTransient(errs.Get()) | 978 » return transient.Tag.Apply(errs.Get()) |
978 } | 979 } |
979 | 980 |
980 // enqueueInvTimers submits all timers emitted by an invocation manager by | 981 // enqueueInvTimers submits all timers emitted by an invocation manager by |
981 // adding corresponding tasks to the task queue. See ExecuteSerializedAction for | 982 // adding corresponding tasks to the task queue. See ExecuteSerializedAction for |
982 // place where these actions are interpreted. | 983 // place where these actions are interpreted. |
983 func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers
[]invocationTimer) error { | 984 func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers
[]invocationTimer) error { |
984 tasks := make([]*tq.Task, len(timers)) | 985 tasks := make([]*tq.Task, len(timers)) |
985 for i, timer := range timers { | 986 for i, timer := range timers { |
986 payload, err := json.Marshal(actionTaskPayload{ | 987 payload, err := json.Marshal(actionTaskPayload{ |
987 JobID: inv.JobKey.StringID(), | 988 JobID: inv.JobKey.StringID(), |
988 InvID: inv.ID, | 989 InvID: inv.ID, |
989 InvTimer: &timer, | 990 InvTimer: &timer, |
990 }) | 991 }) |
991 if err != nil { | 992 if err != nil { |
992 return err | 993 return err |
993 } | 994 } |
994 tasks[i] = &tq.Task{ | 995 tasks[i] = &tq.Task{ |
995 Path: e.TimersQueuePath, | 996 Path: e.TimersQueuePath, |
996 ETA: clock.Now(c).Add(timer.Delay), | 997 ETA: clock.Now(c).Add(timer.Delay), |
997 Payload: payload, | 998 Payload: payload, |
998 } | 999 } |
999 } | 1000 } |
1000 » return errors.WrapTransient(tq.Add(c, e.TimersQueueName, tasks...)) | 1001 » return transient.Tag.Apply(tq.Add(c, e.TimersQueueName, tasks...)) |
1001 } | 1002 } |
1002 | 1003 |
1003 func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, r
etryCount int) error { | 1004 func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, r
etryCount int) error { |
1004 payload := actionTaskPayload{} | 1005 payload := actionTaskPayload{} |
1005 if err := json.Unmarshal(action, &payload); err != nil { | 1006 if err := json.Unmarshal(action, &payload); err != nil { |
1006 return err | 1007 return err |
1007 } | 1008 } |
1008 if payload.InvID == 0 { | 1009 if payload.InvID == 0 { |
1009 return e.executeJobAction(c, &payload, retryCount) | 1010 return e.executeJobAction(c, &payload, retryCount) |
1010 } | 1011 } |
(...skipping 272 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1283 Started: now, | 1284 Started: now, |
1284 Finished: now, | 1285 Finished: now, |
1285 Status: task.StatusOverrun, | 1286 Status: task.StatusOverrun, |
1286 } | 1287 } |
1287 if runningInvID == 0 { | 1288 if runningInvID == 0 { |
1288 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still starting") | 1289 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still starting") |
1289 } else { | 1290 } else { |
1290 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still running: %d", runningInvID) | 1291 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still running: %d", runningInvID) |
1291 } | 1292 } |
1292 inv.debugLog(c, "Total overruns thus far: %d", overruns) | 1293 inv.debugLog(c, "Total overruns thus far: %d", overruns) |
1293 » return errors.WrapTransient(ds.Put(c, &inv)) | 1294 » return transient.Tag.Apply(ds.Put(c, &inv)) |
1294 } | 1295 } |
1295 | 1296 |
1296 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. | 1297 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. |
1297 // | 1298 // |
1298 // See also handlePubSubMessage, it is quite similar. | 1299 // See also handlePubSubMessage, it is quite similar. |
1299 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
int64, timer *invocationTimer) error { | 1300 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
int64, timer *invocationTimer) error { |
1300 c = logging.SetField(c, "JobID", jobID) | 1301 c = logging.SetField(c, "JobID", jobID) |
1301 c = logging.SetField(c, "InvID", invID) | 1302 c = logging.SetField(c, "InvID", invID) |
1302 | 1303 |
1303 logging.Infof(c, "Handling invocation timer %q", timer.Name) | 1304 logging.Infof(c, "Handling invocation timer %q", timer.Name) |
(...skipping 16 matching lines...) Expand all Loading... |
1320 ctl, err := e.controllerForInvocation(c, inv) | 1321 ctl, err := e.controllerForInvocation(c, inv) |
1321 if err != nil { | 1322 if err != nil { |
1322 logging.Errorf(c, "Cannot get controller - %s", err) | 1323 logging.Errorf(c, "Cannot get controller - %s", err) |
1323 return err | 1324 return err |
1324 } | 1325 } |
1325 | 1326 |
1326 // Hand the message to the TaskManager. | 1327 // Hand the message to the TaskManager. |
1327 err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload) | 1328 err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload) |
1328 if err != nil { | 1329 if err != nil { |
1329 logging.Errorf(c, "Error when handling the timer - %s", err) | 1330 logging.Errorf(c, "Error when handling the timer - %s", err) |
1330 » » if !errors.IsTransient(err) && ctl.State().Status != task.Status
Failed { | 1331 » » if !transient.Tag.In(err) && ctl.State().Status != task.StatusFa
iled { |
1331 ctl.DebugLog("Fatal error when handling timer, aborting
invocation - %s", err) | 1332 ctl.DebugLog("Fatal error when handling timer, aborting
invocation - %s", err) |
1332 ctl.State().Status = task.StatusFailed | 1333 ctl.State().Status = task.StatusFailed |
1333 } | 1334 } |
1334 } | 1335 } |
1335 | 1336 |
1336 // Save anyway, to preserve the invocation log. | 1337 // Save anyway, to preserve the invocation log. |
1337 saveErr := ctl.Save(c) | 1338 saveErr := ctl.Save(c) |
1338 if saveErr != nil { | 1339 if saveErr != nil { |
1339 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) | 1340 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) |
1340 } | 1341 } |
1341 | 1342 |
1342 // Retry the delivery if at least one error is transient. HandleTimer mu
st be | 1343 // Retry the delivery if at least one error is transient. HandleTimer mu
st be |
1343 // idempotent. | 1344 // idempotent. |
1344 switch { | 1345 switch { |
1345 case err == nil && saveErr == nil: | 1346 case err == nil && saveErr == nil: |
1346 return nil | 1347 return nil |
1347 » case errors.IsTransient(saveErr): | 1348 » case transient.Tag.In(err): |
1348 return saveErr | 1349 return saveErr |
1349 default: | 1350 default: |
1350 return err // transient or fatal | 1351 return err // transient or fatal |
1351 } | 1352 } |
1352 } | 1353 } |
1353 | 1354 |
1354 // startInvocation is called via task queue to start running a job. This call | 1355 // startInvocation is called via task queue to start running a job. This call |
1355 // may be retried by task queue service. | 1356 // may be retried by task queue service. |
1356 func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation
Nonce int64, | 1357 func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation
Nonce int64, |
1357 triggeredBy identity.Identity, retryCount int) error { | 1358 triggeredBy identity.Identity, retryCount int) error { |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1487 // In either case, invocation never ends up in StatusStarting state. | 1488 // In either case, invocation never ends up in StatusStarting state. |
1488 err = ctl.manager.LaunchTask(c, ctl) | 1489 err = ctl.manager.LaunchTask(c, ctl) |
1489 if ctl.State().Status == task.StatusStarting { | 1490 if ctl.State().Status == task.StatusStarting { |
1490 ctl.State().Status = task.StatusFailed | 1491 ctl.State().Status = task.StatusFailed |
1491 if err == nil { | 1492 if err == nil { |
1492 err = fmt.Errorf("LaunchTask didn't move invocation out
of StatusStarting") | 1493 err = fmt.Errorf("LaunchTask didn't move invocation out
of StatusStarting") |
1493 } | 1494 } |
1494 } | 1495 } |
1495 | 1496 |
1496 // Give up retrying on transient errors after some number of attempts. | 1497 // Give up retrying on transient errors after some number of attempts. |
1497 » if errors.IsTransient(err) && retryCount+1 >= invocationRetryLimit { | 1498 » if transient.Tag.In(err) && retryCount+1 >= invocationRetryLimit { |
1498 err = fmt.Errorf("Too many retries, giving up (original error -
%s)", err) | 1499 err = fmt.Errorf("Too many retries, giving up (original error -
%s)", err) |
1499 } | 1500 } |
1500 | 1501 |
1501 // If asked to retry the invocation (by returning a transient error), do
not | 1502 // If asked to retry the invocation (by returning a transient error), do
not |
1502 // touch Job entity when saving the current (failed) invocation. That wa
y Job | 1503 // touch Job entity when saving the current (failed) invocation. That wa
y Job |
1503 // stays in "QUEUED" state (indicating it's queued for a new invocation)
. | 1504 // stays in "QUEUED" state (indicating it's queued for a new invocation)
. |
1504 » retryInvocation := errors.IsTransient(err) | 1505 » if saveErr := ctl.saveImpl(c, !transient.Tag.In(err)); saveErr != nil { |
1505 » if saveErr := ctl.saveImpl(c, !retryInvocation); saveErr != nil { | |
1506 logging.Errorf(c, "Failed to save invocation state - %s", saveEr
r) | 1506 logging.Errorf(c, "Failed to save invocation state - %s", saveEr
r) |
1507 if err == nil { | 1507 if err == nil { |
1508 err = saveErr | 1508 err = saveErr |
1509 } | 1509 } |
1510 } | 1510 } |
1511 | 1511 |
1512 // Returning transient error here causes the task queue to retry the tas
k. | 1512 // Returning transient error here causes the task queue to retry the tas
k. |
1513 if err != nil { | 1513 if err != nil { |
1514 logging.WithError(err).Errorf(c, "Invocation failed to start") | 1514 logging.WithError(err).Errorf(c, "Invocation failed to start") |
1515 } | 1515 } |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1655 _, sub := e.genTopicAndSubNames(c, taskManagerName, publisher) | 1655 _, sub := e.genTopicAndSubNames(c, taskManagerName, publisher) |
1656 msg, ack, err := pullSubcription(c, sub, "") | 1656 msg, ack, err := pullSubcription(c, sub, "") |
1657 if err != nil { | 1657 if err != nil { |
1658 return err | 1658 return err |
1659 } | 1659 } |
1660 if msg == nil { | 1660 if msg == nil { |
1661 logging.Infof(c, "No new PubSub messages") | 1661 logging.Infof(c, "No new PubSub messages") |
1662 return nil | 1662 return nil |
1663 } | 1663 } |
1664 err = e.handlePubSubMessage(c, msg) | 1664 err = e.handlePubSubMessage(c, msg) |
1665 » if err == nil || !errors.IsTransient(err) { | 1665 » if err == nil || !transient.Tag.In(err) { |
1666 ack() // ack only on success of fatal errors (to stop redelivery
) | 1666 ack() // ack only on success of fatal errors (to stop redelivery
) |
1667 } | 1667 } |
1668 return err | 1668 return err |
1669 } | 1669 } |
1670 | 1670 |
1671 func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
ssage) error { | 1671 func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
ssage) error { |
1672 logging.Infof(c, "Received PubSub message %q", msg.MessageId) | 1672 logging.Infof(c, "Received PubSub message %q", msg.MessageId) |
1673 | 1673 |
1674 // Extract Job and Invocation ID from validated auth_token. | 1674 // Extract Job and Invocation ID from validated auth_token. |
1675 var jobID string | 1675 var jobID string |
(...skipping 30 matching lines...) Expand all Loading... |
1706 ctl, err := e.controllerForInvocation(c, inv) | 1706 ctl, err := e.controllerForInvocation(c, inv) |
1707 if err != nil { | 1707 if err != nil { |
1708 logging.Errorf(c, "Cannot get controller - %s", err) | 1708 logging.Errorf(c, "Cannot get controller - %s", err) |
1709 return err | 1709 return err |
1710 } | 1710 } |
1711 | 1711 |
1712 // Hand the message to the TaskManager. | 1712 // Hand the message to the TaskManager. |
1713 err = ctl.manager.HandleNotification(c, ctl, msg) | 1713 err = ctl.manager.HandleNotification(c, ctl, msg) |
1714 if err != nil { | 1714 if err != nil { |
1715 logging.Errorf(c, "Error when handling the message - %s", err) | 1715 logging.Errorf(c, "Error when handling the message - %s", err) |
1716 » » if !errors.IsTransient(err) && ctl.State().Status != task.Status
Failed { | 1716 » » if !transient.Tag.In(err) && ctl.State().Status != task.StatusFa
iled { |
1717 ctl.DebugLog("Fatal error when handling PubSub notificat
ion, aborting invocation - %s", err) | 1717 ctl.DebugLog("Fatal error when handling PubSub notificat
ion, aborting invocation - %s", err) |
1718 ctl.State().Status = task.StatusFailed | 1718 ctl.State().Status = task.StatusFailed |
1719 } | 1719 } |
1720 } | 1720 } |
1721 | 1721 |
1722 // Save anyway, to preserve the invocation log. | 1722 // Save anyway, to preserve the invocation log. |
1723 saveErr := ctl.Save(c) | 1723 saveErr := ctl.Save(c) |
1724 if saveErr != nil { | 1724 if saveErr != nil { |
1725 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) | 1725 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) |
1726 } | 1726 } |
1727 | 1727 |
1728 // Retry the delivery if at least one error is transient. HandleNotifica
tion | 1728 // Retry the delivery if at least one error is transient. HandleNotifica
tion |
1729 // must be idempotent. | 1729 // must be idempotent. |
1730 switch { | 1730 switch { |
1731 case err == nil && saveErr == nil: | 1731 case err == nil && saveErr == nil: |
1732 return nil | 1732 return nil |
1733 » case errors.IsTransient(saveErr): | 1733 » case transient.Tag.In(saveErr): |
1734 return saveErr | 1734 return saveErr |
1735 default: | 1735 default: |
1736 return err // transient or fatal | 1736 return err // transient or fatal |
1737 } | 1737 } |
1738 } | 1738 } |
1739 | 1739 |
1740 //////////////////////////////////////////////////////////////////////////////// | 1740 //////////////////////////////////////////////////////////////////////////////// |
1741 // TaskController. | 1741 // TaskController. |
1742 | 1742 |
1743 type taskController struct { | 1743 type taskController struct { |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1824 } | 1824 } |
1825 | 1825 |
1826 // Save is part of task.Controller interface. | 1826 // Save is part of task.Controller interface. |
1827 func (ctl *taskController) Save(ctx context.Context) error { | 1827 func (ctl *taskController) Save(ctx context.Context) error { |
1828 return ctl.saveImpl(ctx, true) | 1828 return ctl.saveImpl(ctx, true) |
1829 } | 1829 } |
1830 | 1830 |
1831 // errUpdateConflict means Invocation is being modified by two TaskController's | 1831 // errUpdateConflict means Invocation is being modified by two TaskController's |
1832 // concurrently. It should not be happening often. If it happens, task queue | 1832 // concurrently. It should not be happening often. If it happens, task queue |
1833 // call is retried to rerun the two-part transaction from scratch. | 1833 // call is retried to rerun the two-part transaction from scratch. |
1834 var errUpdateConflict = errors.WrapTransient(errors.New("concurrent modification
s of single Invocation")) | 1834 var errUpdateConflict = errors.New("concurrent modifications of single Invocatio
n", transient.Tag) |
1835 | 1835 |
1836 // saveImpl uploads updated Invocation to the datastore. If updateJob is true, | 1836 // saveImpl uploads updated Invocation to the datastore. If updateJob is true, |
1837 // it will also roll corresponding state machine forward. | 1837 // it will also roll corresponding state machine forward. |
1838 func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er
ror) { | 1838 func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er
ror) { |
1839 // Mutate copy in case transaction below fails. Also unpacks ctl.state b
ack | 1839 // Mutate copy in case transaction below fails. Also unpacks ctl.state b
ack |
1840 // into the entity (reverse of 'populateState'). | 1840 // into the entity (reverse of 'populateState'). |
1841 saving := ctl.saved | 1841 saving := ctl.saved |
1842 saving.Status = ctl.state.Status | 1842 saving.Status = ctl.state.Status |
1843 saving.TaskData = append([]byte(nil), ctl.state.TaskData...) | 1843 saving.TaskData = append([]byte(nil), ctl.state.TaskData...) |
1844 saving.ViewURL = ctl.state.ViewURL | 1844 saving.ViewURL = ctl.state.ViewURL |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1880 // expect it to be. | 1880 // expect it to be. |
1881 mostRecent := Invocation{ | 1881 mostRecent := Invocation{ |
1882 ID: saving.ID, | 1882 ID: saving.ID, |
1883 JobKey: saving.JobKey, | 1883 JobKey: saving.JobKey, |
1884 } | 1884 } |
1885 switch err := ds.Get(c, &mostRecent); { | 1885 switch err := ds.Get(c, &mostRecent); { |
1886 case err == ds.ErrNoSuchEntity: // should not happen | 1886 case err == ds.ErrNoSuchEntity: // should not happen |
1887 logging.Errorf(c, "Invocation is suddenly gone") | 1887 logging.Errorf(c, "Invocation is suddenly gone") |
1888 return errors.New("invocation is suddenly gone") | 1888 return errors.New("invocation is suddenly gone") |
1889 case err != nil: | 1889 case err != nil: |
1890 » » » return errors.WrapTransient(err) | 1890 » » » return transient.Tag.Apply(err) |
1891 } | 1891 } |
1892 | 1892 |
1893 // Make sure no one touched it while we were handling the invoca
tion. | 1893 // Make sure no one touched it while we were handling the invoca
tion. |
1894 if saving.MutationsCount != mostRecent.MutationsCount+1 { | 1894 if saving.MutationsCount != mostRecent.MutationsCount+1 { |
1895 logging.Errorf(c, "Invocation was modified by someone el
se while we were handling it") | 1895 logging.Errorf(c, "Invocation was modified by someone el
se while we were handling it") |
1896 return errUpdateConflict | 1896 return errUpdateConflict |
1897 } | 1897 } |
1898 | 1898 |
1899 // Store the invocation entity and schedule invocation timers re
gardless of | 1899 // Store the invocation entity and schedule invocation timers re
gardless of |
1900 // the current state of the Job entity. The table of all invocat
ions is | 1900 // the current state of the Job entity. The table of all invocat
ions is |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1937 } | 1937 } |
1938 if hasFinished { | 1938 if hasFinished { |
1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { | 1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { |
1940 sm.OnInvocationDone(saving.ID) | 1940 sm.OnInvocationDone(saving.ID) |
1941 return nil | 1941 return nil |
1942 }) | 1942 }) |
1943 } | 1943 } |
1944 return nil | 1944 return nil |
1945 }) | 1945 }) |
1946 } | 1946 } |
OLD | NEW |