Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: scheduler/appengine/engine/engine.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package engine implements the core logic of the scheduler service. 5 // Package engine implements the core logic of the scheduler service.
6 package engine 6 package engine
7 7
8 import ( 8 import (
9 "bytes" 9 "bytes"
10 "encoding/json" 10 "encoding/json"
(...skipping 14 matching lines...) Expand all
25 ds "github.com/luci/gae/service/datastore" 25 ds "github.com/luci/gae/service/datastore"
26 "github.com/luci/gae/service/info" 26 "github.com/luci/gae/service/info"
27 mc "github.com/luci/gae/service/memcache" 27 mc "github.com/luci/gae/service/memcache"
28 tq "github.com/luci/gae/service/taskqueue" 28 tq "github.com/luci/gae/service/taskqueue"
29 29
30 "github.com/luci/luci-go/common/clock" 30 "github.com/luci/luci-go/common/clock"
31 "github.com/luci/luci-go/common/data/rand/mathrand" 31 "github.com/luci/luci-go/common/data/rand/mathrand"
32 "github.com/luci/luci-go/common/data/stringset" 32 "github.com/luci/luci-go/common/data/stringset"
33 "github.com/luci/luci-go/common/errors" 33 "github.com/luci/luci-go/common/errors"
34 "github.com/luci/luci-go/common/logging" 34 "github.com/luci/luci-go/common/logging"
35 "github.com/luci/luci-go/common/retry/transient"
35 "github.com/luci/luci-go/server/auth" 36 "github.com/luci/luci-go/server/auth"
36 "github.com/luci/luci-go/server/auth/identity" 37 "github.com/luci/luci-go/server/auth/identity"
37 "github.com/luci/luci-go/server/auth/signing" 38 "github.com/luci/luci-go/server/auth/signing"
38 "github.com/luci/luci-go/server/tokens" 39 "github.com/luci/luci-go/server/tokens"
39 40
40 "github.com/luci/luci-go/scheduler/appengine/catalog" 41 "github.com/luci/luci-go/scheduler/appengine/catalog"
41 "github.com/luci/luci-go/scheduler/appengine/schedule" 42 "github.com/luci/luci-go/scheduler/appengine/schedule"
42 "github.com/luci/luci-go/scheduler/appengine/task" 43 "github.com/luci/luci-go/scheduler/appengine/task"
43 ) 44 )
44 45
(...skipping 509 matching lines...) Expand 10 before | Expand all | Expand 10 after
554 // Check the global cache. 555 // Check the global cache.
555 switch _, err := mc.GetKey(c, key); { 556 switch _, err := mc.GetKey(c, key); {
556 case err == nil: 557 case err == nil:
557 e.lock.Lock() 558 e.lock.Lock()
558 defer e.lock.Unlock() 559 defer e.lock.Unlock()
559 e.doneFlags[key] = true 560 e.doneFlags[key] = true
560 return nil 561 return nil
561 case err == mc.ErrCacheMiss: 562 case err == mc.ErrCacheMiss:
562 break 563 break
563 default: 564 default:
564 » » return errors.WrapTransient(err) 565 » » return transient.Tag.Apply(err)
565 } 566 }
566 567
567 // Do it. 568 // Do it.
568 if err := cb(); err != nil { 569 if err := cb(); err != nil {
569 return err 570 return err
570 } 571 }
571 572
572 // Store in the global cache. Ignore errors, it's not a big deal. 573 // Store in the global cache. Ignore errors, it's not a big deal.
573 item := mc.NewItem(c, key) 574 item := mc.NewItem(c, key)
574 item.SetValue([]byte("ok")) 575 item.SetValue([]byte("ok"))
575 item.SetExpiration(24 * time.Hour) 576 item.SetExpiration(24 * time.Hour)
576 if err := mc.Set(c, item); err != nil { 577 if err := mc.Set(c, item); err != nil {
577 logging.Warningf(c, "Failed to write item to memcache - %s", err ) 578 logging.Warningf(c, "Failed to write item to memcache - %s", err )
578 } 579 }
579 580
580 // Store in the local cache. 581 // Store in the local cache.
581 e.lock.Lock() 582 e.lock.Lock()
582 defer e.lock.Unlock() 583 defer e.lock.Unlock()
583 e.doneFlags[key] = true 584 e.doneFlags[key] = true
584 return nil 585 return nil
585 } 586 }
586 587
587 func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { 588 func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) {
588 q := ds.NewQuery("Job"). 589 q := ds.NewQuery("Job").
589 Eq("Enabled", true). 590 Eq("Enabled", true).
590 Project("ProjectID"). 591 Project("ProjectID").
591 Distinct(true) 592 Distinct(true)
592 entities := []Job{} 593 entities := []Job{}
593 if err := ds.GetAll(c, q, &entities); err != nil { 594 if err := ds.GetAll(c, q, &entities); err != nil {
594 » » return nil, errors.WrapTransient(err) 595 » » return nil, transient.Tag.Apply(err)
595 } 596 }
596 // Filter out duplicates, sort. 597 // Filter out duplicates, sort.
597 projects := stringset.New(len(entities)) 598 projects := stringset.New(len(entities))
598 for _, ent := range entities { 599 for _, ent := range entities {
599 projects.Add(ent.ProjectID) 600 projects.Add(ent.ProjectID)
600 } 601 }
601 out := projects.ToSlice() 602 out := projects.ToSlice()
602 sort.Strings(out) 603 sort.Strings(out)
603 return out, nil 604 return out, nil
604 } 605 }
605 606
606 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { 607 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) {
607 q := ds.NewQuery("Job").Eq("Enabled", true) 608 q := ds.NewQuery("Job").Eq("Enabled", true)
608 return e.queryEnabledJobs(c, q) 609 return e.queryEnabledJobs(c, q)
609 } 610 }
610 611
611 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job , error) { 612 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job , error) {
612 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) 613 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID)
613 return e.queryEnabledJobs(c, q) 614 return e.queryEnabledJobs(c, q)
614 } 615 }
615 616
616 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e rror) { 617 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e rror) {
617 entities := []*Job{} 618 entities := []*Job{}
618 if err := ds.GetAll(c, q, &entities); err != nil { 619 if err := ds.GetAll(c, q, &entities); err != nil {
619 » » return nil, errors.WrapTransient(err) 620 » » return nil, transient.Tag.Apply(err)
620 } 621 }
621 // Non-ancestor query used, need to recheck filters. 622 // Non-ancestor query used, need to recheck filters.
622 filtered := make([]*Job, 0, len(entities)) 623 filtered := make([]*Job, 0, len(entities))
623 for _, job := range entities { 624 for _, job := range entities {
624 if job.Enabled { 625 if job.Enabled {
625 filtered = append(filtered, job) 626 filtered = append(filtered, job)
626 } 627 }
627 } 628 }
628 return filtered, nil 629 return filtered, nil
629 } 630 }
630 631
631 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { 632 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
632 job := &Job{JobID: jobID} 633 job := &Job{JobID: jobID}
633 switch err := ds.Get(c, job); { 634 switch err := ds.Get(c, job); {
634 case err == nil: 635 case err == nil:
635 return job, nil 636 return job, nil
636 case err == ds.ErrNoSuchEntity: 637 case err == ds.ErrNoSuchEntity:
637 return nil, nil 638 return nil, nil
638 default: 639 default:
639 » » return nil, errors.WrapTransient(err) 640 » » return nil, transient.Tag.Apply(err)
640 } 641 }
641 } 642 }
642 643
643 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) { 644 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i nt, cursor string) ([]*Invocation, string, error) {
644 if pageSize == 0 || pageSize > 500 { 645 if pageSize == 0 || pageSize > 500 {
645 pageSize = 500 646 pageSize = 500
646 } 647 }
647 648
648 // Deserialize the cursor. 649 // Deserialize the cursor.
649 var cursorObj ds.Cursor 650 var cursorObj ds.Cursor
(...skipping 23 matching lines...) Expand all
673 return nil 674 return nil
674 } 675 }
675 c, err := getCursor() 676 c, err := getCursor()
676 if err != nil { 677 if err != nil {
677 return err 678 return err
678 } 679 }
679 newCursor = c.String() 680 newCursor = c.String()
680 return ds.Stop 681 return ds.Stop
681 }) 682 })
682 if err != nil { 683 if err != nil {
683 » » return nil, "", errors.WrapTransient(err) 684 » » return nil, "", transient.Tag.Apply(err)
684 } 685 }
685 return out, newCursor, nil 686 return out, newCursor, nil
686 } 687 }
687 688
688 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) { 689 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) (*Invocation, error) {
689 inv := &Invocation{ 690 inv := &Invocation{
690 ID: invID, 691 ID: invID,
691 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), 692 JobKey: ds.NewKey(c, "Job", jobID, 0, nil),
692 } 693 }
693 switch err := ds.Get(c, inv); { 694 switch err := ds.Get(c, inv); {
694 case err == nil: 695 case err == nil:
695 return inv, nil 696 return inv, nil
696 case err == ds.ErrNoSuchEntity: 697 case err == ds.ErrNoSuchEntity:
697 return nil, nil 698 return nil, nil
698 default: 699 default:
699 » » return nil, errors.WrapTransient(err) 700 » » return nil, transient.Tag.Apply(err)
700 } 701 }
701 } 702 }
702 703
703 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([ ]*Invocation, error) { 704 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([ ]*Invocation, error) {
704 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) 705 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
705 entities := []*Invocation{} 706 entities := []*Invocation{}
706 if err := ds.GetAll(c, q, &entities); err != nil { 707 if err := ds.GetAll(c, q, &entities); err != nil {
707 » » return nil, errors.WrapTransient(err) 708 » » return nil, transient.Tag.Apply(err)
708 } 709 }
709 return entities, nil 710 return entities, nil
710 } 711 }
711 712
712 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error { 713 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs []catalog.Definition) error {
713 // JobID -> *Job map. 714 // JobID -> *Job map.
714 existing, err := e.getProjectJobs(c, projectID) 715 existing, err := e.getProjectJobs(c, projectID)
715 if err != nil { 716 if err != nil {
716 return err 717 return err
717 } 718 }
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
752 go func(i int, jobID string) { 753 go func(i int, jobID string) {
753 disableErrs.Assign(i, e.disableJob(c, jobID)) 754 disableErrs.Assign(i, e.disableJob(c, jobID))
754 wg.Done() 755 wg.Done()
755 }(i, jobID) 756 }(i, jobID)
756 } 757 }
757 758
758 wg.Wait() 759 wg.Wait()
759 if updateErrs.Get() == nil && disableErrs.Get() == nil { 760 if updateErrs.Get() == nil && disableErrs.Get() == nil {
760 return nil 761 return nil
761 } 762 }
762 » return errors.WrapTransient(errors.NewMultiError(updateErrs.Get(), disab leErrs.Get())) 763 » return transient.Tag.Apply(errors.NewMultiError(updateErrs.Get(), disabl eErrs.Get()))
763 } 764 }
764 765
765 func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { 766 func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
766 if !info.IsDevAppServer(c) { 767 if !info.IsDevAppServer(c) {
767 return errors.New("ResetAllJobsOnDevServer must not be used in p roduction") 768 return errors.New("ResetAllJobsOnDevServer must not be used in p roduction")
768 } 769 }
769 q := ds.NewQuery("Job").Eq("Enabled", true) 770 q := ds.NewQuery("Job").Eq("Enabled", true)
770 keys := []*ds.Key{} 771 keys := []*ds.Key{}
771 if err := ds.GetAll(c, q, &keys); err != nil { 772 if err := ds.GetAll(c, q, &keys); err != nil {
772 » » return errors.WrapTransient(err) 773 » » return transient.Tag.Apply(err)
773 } 774 }
774 wg := sync.WaitGroup{} 775 wg := sync.WaitGroup{}
775 errs := errors.NewLazyMultiError(len(keys)) 776 errs := errors.NewLazyMultiError(len(keys))
776 for i, key := range keys { 777 for i, key := range keys {
777 wg.Add(1) 778 wg.Add(1)
778 go func(i int, key *ds.Key) { 779 go func(i int, key *ds.Key) {
779 errs.Assign(i, e.resetJobOnDevServer(c, key.StringID())) 780 errs.Assign(i, e.resetJobOnDevServer(c, key.StringID()))
780 wg.Done() 781 wg.Done()
781 }(i, key) 782 }(i, key)
782 } 783 }
783 wg.Wait() 784 wg.Wait()
784 » return errors.WrapTransient(errs.Get()) 785 » return transient.Tag.Apply(errs.Get())
785 } 786 }
786 787
787 // getProjectJobs fetches from ds all enabled jobs belonging to a given 788 // getProjectJobs fetches from ds all enabled jobs belonging to a given
788 // project. 789 // project.
789 func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st ring]*Job, error) { 790 func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st ring]*Job, error) {
790 q := ds.NewQuery("Job"). 791 q := ds.NewQuery("Job").
791 Eq("Enabled", true). 792 Eq("Enabled", true).
792 Eq("ProjectID", projectID) 793 Eq("ProjectID", projectID)
793 entities := []*Job{} 794 entities := []*Job{}
794 if err := ds.GetAll(c, q, &entities); err != nil { 795 if err := ds.GetAll(c, q, &entities); err != nil {
795 » » return nil, errors.WrapTransient(err) 796 » » return nil, transient.Tag.Apply(err)
796 } 797 }
797 out := make(map[string]*Job, len(entities)) 798 out := make(map[string]*Job, len(entities))
798 for _, job := range entities { 799 for _, job := range entities {
799 if job.Enabled && job.ProjectID == projectID { 800 if job.Enabled && job.ProjectID == projectID {
800 out[job.JobID] = job 801 out[job.JobID] = job
801 } 802 }
802 } 803 }
803 return out, nil 804 return out, nil
804 } 805 }
805 806
(...skipping 25 matching lines...) Expand all
831 logging.Warningf(c, "Retrying transaction...") 832 logging.Warningf(c, "Retrying transaction...")
832 } 833 }
833 stored := Job{JobID: jobID} 834 stored := Job{JobID: jobID}
834 err := ds.Get(c, &stored) 835 err := ds.Get(c, &stored)
835 if err != nil && err != ds.ErrNoSuchEntity { 836 if err != nil && err != ds.ErrNoSuchEntity {
836 return err 837 return err
837 } 838 }
838 modified := stored 839 modified := stored
839 err = txn(c, &modified, err == ds.ErrNoSuchEntity) 840 err = txn(c, &modified, err == ds.ErrNoSuchEntity)
840 if err != nil && err != errSkipPut { 841 if err != nil && err != errSkipPut {
841 » » » fatal = !errors.IsTransient(err) 842 » » » fatal = !transient.Tag.In(err)
842 return err 843 return err
843 } 844 }
844 if err != errSkipPut && !modified.isEqual(&stored) { 845 if err != errSkipPut && !modified.isEqual(&stored) {
845 return ds.Put(c, &modified) 846 return ds.Put(c, &modified)
846 } 847 }
847 return nil 848 return nil
848 }, &defaultTransactionOptions) 849 }, &defaultTransactionOptions)
849 if err != nil { 850 if err != nil {
850 logging.Errorf(c, "Job transaction failed: %s", err) 851 logging.Errorf(c, "Job transaction failed: %s", err)
851 if fatal { 852 if fatal {
852 return err 853 return err
853 } 854 }
854 // By now err is already transient (since 'fatal' is false) or i t is commit 855 // By now err is already transient (since 'fatal' is false) or i t is commit
855 // error (i.e. produced by RunInTransaction itself, not by its c allback). 856 // error (i.e. produced by RunInTransaction itself, not by its c allback).
856 // Need to wrap commit errors too. 857 // Need to wrap commit errors too.
857 » » return errors.WrapTransient(err) 858 » » return transient.Tag.Apply(err)
858 } 859 }
859 if attempt > 1 { 860 if attempt > 1 {
860 logging.Infof(c, "Committed on %d attempt", attempt) 861 logging.Infof(c, "Committed on %d attempt", attempt)
861 } 862 }
862 return nil 863 return nil
863 } 864 }
864 865
865 // rollSM is called under transaction to perform a single state machine 866 // rollSM is called under transaction to perform a single state machine
866 // transition. It sets up StateMachine instance, calls the callback, mutates 867 // transition. It sets up StateMachine instance, calls the callback, mutates
867 // job.State in place (with a new state) and enqueues all emitted actions to 868 // job.State in place (with a new state) and enqueues all emitted actions to
868 // task queues. 869 // task queues.
869 func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine) error) error { 870 func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine) error) error {
870 sched, err := job.parseSchedule() 871 sched, err := job.parseSchedule()
871 if err != nil { 872 if err != nil {
872 return fmt.Errorf("bad schedule %q - %s", job.effectiveSchedule( ), err) 873 return fmt.Errorf("bad schedule %q - %s", job.effectiveSchedule( ), err)
873 } 874 }
874 now := clock.Now(c).UTC() 875 now := clock.Now(c).UTC()
875 rnd := mathrand.Get(c) 876 rnd := mathrand.Get(c)
876 sm := StateMachine{ 877 sm := StateMachine{
877 State: job.State, 878 State: job.State,
878 Now: now, 879 Now: now,
879 Schedule: sched, 880 Schedule: sched,
880 Nonce: func() int64 { return rnd.Int63() + 1 }, 881 Nonce: func() int64 { return rnd.Int63() + 1 },
881 Context: c, 882 Context: c,
882 } 883 }
883 // All errors returned by state machine transition changes are transient . 884 // All errors returned by state machine transition changes are transient .
884 // Fatal errors (when we have them) should be reflected as a state chang ing 885 // Fatal errors (when we have them) should be reflected as a state chang ing
885 // into "BROKEN" state. 886 // into "BROKEN" state.
886 if err := cb(&sm); err != nil { 887 if err := cb(&sm); err != nil {
887 » » return errors.WrapTransient(err) 888 » » return transient.Tag.Apply(err)
888 } 889 }
889 if len(sm.Actions) != 0 { 890 if len(sm.Actions) != 0 {
890 if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err != nil { 891 if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err != nil {
891 return err 892 return err
892 } 893 }
893 } 894 }
894 if sm.State.State != job.State.State { 895 if sm.State.State != job.State.State {
895 logging.Infof(c, "%s -> %s", job.State.State, sm.State.State) 896 logging.Infof(c, "%s -> %s", job.State.State, sm.State.State)
896 } 897 }
897 job.State = sm.State 898 job.State = sm.State
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
967 i := 0 968 i := 0
968 for queueName, tasks := range qs { 969 for queueName, tasks := range qs {
969 wg.Add(1) 970 wg.Add(1)
970 go func(i int, queueName string, tasks []*tq.Task) { 971 go func(i int, queueName string, tasks []*tq.Task) {
971 errs.Assign(i, tq.Add(c, queueName, tasks...)) 972 errs.Assign(i, tq.Add(c, queueName, tasks...))
972 wg.Done() 973 wg.Done()
973 }(i, queueName, tasks) 974 }(i, queueName, tasks)
974 i++ 975 i++
975 } 976 }
976 wg.Wait() 977 wg.Wait()
977 » return errors.WrapTransient(errs.Get()) 978 » return transient.Tag.Apply(errs.Get())
978 } 979 }
979 980
980 // enqueueInvTimers submits all timers emitted by an invocation manager by 981 // enqueueInvTimers submits all timers emitted by an invocation manager by
981 // adding corresponding tasks to the task queue. See ExecuteSerializedAction for 982 // adding corresponding tasks to the task queue. See ExecuteSerializedAction for
982 // place where these actions are interpreted. 983 // place where these actions are interpreted.
983 func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers []invocationTimer) error { 984 func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers []invocationTimer) error {
984 tasks := make([]*tq.Task, len(timers)) 985 tasks := make([]*tq.Task, len(timers))
985 for i, timer := range timers { 986 for i, timer := range timers {
986 payload, err := json.Marshal(actionTaskPayload{ 987 payload, err := json.Marshal(actionTaskPayload{
987 JobID: inv.JobKey.StringID(), 988 JobID: inv.JobKey.StringID(),
988 InvID: inv.ID, 989 InvID: inv.ID,
989 InvTimer: &timer, 990 InvTimer: &timer,
990 }) 991 })
991 if err != nil { 992 if err != nil {
992 return err 993 return err
993 } 994 }
994 tasks[i] = &tq.Task{ 995 tasks[i] = &tq.Task{
995 Path: e.TimersQueuePath, 996 Path: e.TimersQueuePath,
996 ETA: clock.Now(c).Add(timer.Delay), 997 ETA: clock.Now(c).Add(timer.Delay),
997 Payload: payload, 998 Payload: payload,
998 } 999 }
999 } 1000 }
1000 » return errors.WrapTransient(tq.Add(c, e.TimersQueueName, tasks...)) 1001 » return transient.Tag.Apply(tq.Add(c, e.TimersQueueName, tasks...))
1001 } 1002 }
1002 1003
1003 func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, r etryCount int) error { 1004 func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, r etryCount int) error {
1004 payload := actionTaskPayload{} 1005 payload := actionTaskPayload{}
1005 if err := json.Unmarshal(action, &payload); err != nil { 1006 if err := json.Unmarshal(action, &payload); err != nil {
1006 return err 1007 return err
1007 } 1008 }
1008 if payload.InvID == 0 { 1009 if payload.InvID == 0 {
1009 return e.executeJobAction(c, &payload, retryCount) 1010 return e.executeJobAction(c, &payload, retryCount)
1010 } 1011 }
(...skipping 272 matching lines...) Expand 10 before | Expand all | Expand 10 after
1283 Started: now, 1284 Started: now,
1284 Finished: now, 1285 Finished: now,
1285 Status: task.StatusOverrun, 1286 Status: task.StatusOverrun,
1286 } 1287 }
1287 if runningInvID == 0 { 1288 if runningInvID == 0 {
1288 inv.debugLog(c, "New invocation should be starting now, but prev ious one is still starting") 1289 inv.debugLog(c, "New invocation should be starting now, but prev ious one is still starting")
1289 } else { 1290 } else {
1290 inv.debugLog(c, "New invocation should be starting now, but prev ious one is still running: %d", runningInvID) 1291 inv.debugLog(c, "New invocation should be starting now, but prev ious one is still running: %d", runningInvID)
1291 } 1292 }
1292 inv.debugLog(c, "Total overruns thus far: %d", overruns) 1293 inv.debugLog(c, "Total overruns thus far: %d", overruns)
1293 » return errors.WrapTransient(ds.Put(c, &inv)) 1294 » return transient.Tag.Apply(ds.Put(c, &inv))
1294 } 1295 }
1295 1296
1296 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. 1297 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks.
1297 // 1298 //
1298 // See also handlePubSubMessage, it is quite similar. 1299 // See also handlePubSubMessage, it is quite similar.
1299 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error { 1300 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID int64, timer *invocationTimer) error {
1300 c = logging.SetField(c, "JobID", jobID) 1301 c = logging.SetField(c, "JobID", jobID)
1301 c = logging.SetField(c, "InvID", invID) 1302 c = logging.SetField(c, "InvID", invID)
1302 1303
1303 logging.Infof(c, "Handling invocation timer %q", timer.Name) 1304 logging.Infof(c, "Handling invocation timer %q", timer.Name)
(...skipping 16 matching lines...) Expand all
1320 ctl, err := e.controllerForInvocation(c, inv) 1321 ctl, err := e.controllerForInvocation(c, inv)
1321 if err != nil { 1322 if err != nil {
1322 logging.Errorf(c, "Cannot get controller - %s", err) 1323 logging.Errorf(c, "Cannot get controller - %s", err)
1323 return err 1324 return err
1324 } 1325 }
1325 1326
1326 // Hand the message to the TaskManager. 1327 // Hand the message to the TaskManager.
1327 err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload) 1328 err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload)
1328 if err != nil { 1329 if err != nil {
1329 logging.Errorf(c, "Error when handling the timer - %s", err) 1330 logging.Errorf(c, "Error when handling the timer - %s", err)
1330 » » if !errors.IsTransient(err) && ctl.State().Status != task.Status Failed { 1331 » » if !transient.Tag.In(err) && ctl.State().Status != task.StatusFa iled {
1331 ctl.DebugLog("Fatal error when handling timer, aborting invocation - %s", err) 1332 ctl.DebugLog("Fatal error when handling timer, aborting invocation - %s", err)
1332 ctl.State().Status = task.StatusFailed 1333 ctl.State().Status = task.StatusFailed
1333 } 1334 }
1334 } 1335 }
1335 1336
1336 // Save anyway, to preserve the invocation log. 1337 // Save anyway, to preserve the invocation log.
1337 saveErr := ctl.Save(c) 1338 saveErr := ctl.Save(c)
1338 if saveErr != nil { 1339 if saveErr != nil {
1339 logging.Errorf(c, "Error when saving the invocation - %s", saveE rr) 1340 logging.Errorf(c, "Error when saving the invocation - %s", saveE rr)
1340 } 1341 }
1341 1342
1342 // Retry the delivery if at least one error is transient. HandleTimer mu st be 1343 // Retry the delivery if at least one error is transient. HandleTimer mu st be
1343 // idempotent. 1344 // idempotent.
1344 switch { 1345 switch {
1345 case err == nil && saveErr == nil: 1346 case err == nil && saveErr == nil:
1346 return nil 1347 return nil
1347 » case errors.IsTransient(saveErr): 1348 » case transient.Tag.In(err):
1348 return saveErr 1349 return saveErr
1349 default: 1350 default:
1350 return err // transient or fatal 1351 return err // transient or fatal
1351 } 1352 }
1352 } 1353 }
1353 1354
1354 // startInvocation is called via task queue to start running a job. This call 1355 // startInvocation is called via task queue to start running a job. This call
1355 // may be retried by task queue service. 1356 // may be retried by task queue service.
1356 func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation Nonce int64, 1357 func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation Nonce int64,
1357 triggeredBy identity.Identity, retryCount int) error { 1358 triggeredBy identity.Identity, retryCount int) error {
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
1487 // In either case, invocation never ends up in StatusStarting state. 1488 // In either case, invocation never ends up in StatusStarting state.
1488 err = ctl.manager.LaunchTask(c, ctl) 1489 err = ctl.manager.LaunchTask(c, ctl)
1489 if ctl.State().Status == task.StatusStarting { 1490 if ctl.State().Status == task.StatusStarting {
1490 ctl.State().Status = task.StatusFailed 1491 ctl.State().Status = task.StatusFailed
1491 if err == nil { 1492 if err == nil {
1492 err = fmt.Errorf("LaunchTask didn't move invocation out of StatusStarting") 1493 err = fmt.Errorf("LaunchTask didn't move invocation out of StatusStarting")
1493 } 1494 }
1494 } 1495 }
1495 1496
1496 // Give up retrying on transient errors after some number of attempts. 1497 // Give up retrying on transient errors after some number of attempts.
1497 » if errors.IsTransient(err) && retryCount+1 >= invocationRetryLimit { 1498 » if transient.Tag.In(err) && retryCount+1 >= invocationRetryLimit {
1498 err = fmt.Errorf("Too many retries, giving up (original error - %s)", err) 1499 err = fmt.Errorf("Too many retries, giving up (original error - %s)", err)
1499 } 1500 }
1500 1501
1501 // If asked to retry the invocation (by returning a transient error), do not 1502 // If asked to retry the invocation (by returning a transient error), do not
1502 // touch Job entity when saving the current (failed) invocation. That wa y Job 1503 // touch Job entity when saving the current (failed) invocation. That wa y Job
1503 // stays in "QUEUED" state (indicating it's queued for a new invocation) . 1504 // stays in "QUEUED" state (indicating it's queued for a new invocation) .
1504 » retryInvocation := errors.IsTransient(err) 1505 » if saveErr := ctl.saveImpl(c, !transient.Tag.In(err)); saveErr != nil {
1505 » if saveErr := ctl.saveImpl(c, !retryInvocation); saveErr != nil {
1506 logging.Errorf(c, "Failed to save invocation state - %s", saveEr r) 1506 logging.Errorf(c, "Failed to save invocation state - %s", saveEr r)
1507 if err == nil { 1507 if err == nil {
1508 err = saveErr 1508 err = saveErr
1509 } 1509 }
1510 } 1510 }
1511 1511
1512 // Returning transient error here causes the task queue to retry the tas k. 1512 // Returning transient error here causes the task queue to retry the tas k.
1513 if err != nil { 1513 if err != nil {
1514 logging.WithError(err).Errorf(c, "Invocation failed to start") 1514 logging.WithError(err).Errorf(c, "Invocation failed to start")
1515 } 1515 }
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
1655 _, sub := e.genTopicAndSubNames(c, taskManagerName, publisher) 1655 _, sub := e.genTopicAndSubNames(c, taskManagerName, publisher)
1656 msg, ack, err := pullSubcription(c, sub, "") 1656 msg, ack, err := pullSubcription(c, sub, "")
1657 if err != nil { 1657 if err != nil {
1658 return err 1658 return err
1659 } 1659 }
1660 if msg == nil { 1660 if msg == nil {
1661 logging.Infof(c, "No new PubSub messages") 1661 logging.Infof(c, "No new PubSub messages")
1662 return nil 1662 return nil
1663 } 1663 }
1664 err = e.handlePubSubMessage(c, msg) 1664 err = e.handlePubSubMessage(c, msg)
1665 » if err == nil || !errors.IsTransient(err) { 1665 » if err == nil || !transient.Tag.In(err) {
1666 ack() // ack only on success of fatal errors (to stop redelivery ) 1666 ack() // ack only on success of fatal errors (to stop redelivery )
1667 } 1667 }
1668 return err 1668 return err
1669 } 1669 }
1670 1670
1671 func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe ssage) error { 1671 func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe ssage) error {
1672 logging.Infof(c, "Received PubSub message %q", msg.MessageId) 1672 logging.Infof(c, "Received PubSub message %q", msg.MessageId)
1673 1673
1674 // Extract Job and Invocation ID from validated auth_token. 1674 // Extract Job and Invocation ID from validated auth_token.
1675 var jobID string 1675 var jobID string
(...skipping 30 matching lines...) Expand all
1706 ctl, err := e.controllerForInvocation(c, inv) 1706 ctl, err := e.controllerForInvocation(c, inv)
1707 if err != nil { 1707 if err != nil {
1708 logging.Errorf(c, "Cannot get controller - %s", err) 1708 logging.Errorf(c, "Cannot get controller - %s", err)
1709 return err 1709 return err
1710 } 1710 }
1711 1711
1712 // Hand the message to the TaskManager. 1712 // Hand the message to the TaskManager.
1713 err = ctl.manager.HandleNotification(c, ctl, msg) 1713 err = ctl.manager.HandleNotification(c, ctl, msg)
1714 if err != nil { 1714 if err != nil {
1715 logging.Errorf(c, "Error when handling the message - %s", err) 1715 logging.Errorf(c, "Error when handling the message - %s", err)
1716 » » if !errors.IsTransient(err) && ctl.State().Status != task.Status Failed { 1716 » » if !transient.Tag.In(err) && ctl.State().Status != task.StatusFa iled {
1717 ctl.DebugLog("Fatal error when handling PubSub notificat ion, aborting invocation - %s", err) 1717 ctl.DebugLog("Fatal error when handling PubSub notificat ion, aborting invocation - %s", err)
1718 ctl.State().Status = task.StatusFailed 1718 ctl.State().Status = task.StatusFailed
1719 } 1719 }
1720 } 1720 }
1721 1721
1722 // Save anyway, to preserve the invocation log. 1722 // Save anyway, to preserve the invocation log.
1723 saveErr := ctl.Save(c) 1723 saveErr := ctl.Save(c)
1724 if saveErr != nil { 1724 if saveErr != nil {
1725 logging.Errorf(c, "Error when saving the invocation - %s", saveE rr) 1725 logging.Errorf(c, "Error when saving the invocation - %s", saveE rr)
1726 } 1726 }
1727 1727
1728 // Retry the delivery if at least one error is transient. HandleNotifica tion 1728 // Retry the delivery if at least one error is transient. HandleNotifica tion
1729 // must be idempotent. 1729 // must be idempotent.
1730 switch { 1730 switch {
1731 case err == nil && saveErr == nil: 1731 case err == nil && saveErr == nil:
1732 return nil 1732 return nil
1733 » case errors.IsTransient(saveErr): 1733 » case transient.Tag.In(saveErr):
1734 return saveErr 1734 return saveErr
1735 default: 1735 default:
1736 return err // transient or fatal 1736 return err // transient or fatal
1737 } 1737 }
1738 } 1738 }
1739 1739
1740 //////////////////////////////////////////////////////////////////////////////// 1740 ////////////////////////////////////////////////////////////////////////////////
1741 // TaskController. 1741 // TaskController.
1742 1742
1743 type taskController struct { 1743 type taskController struct {
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
1824 } 1824 }
1825 1825
1826 // Save is part of task.Controller interface. 1826 // Save is part of task.Controller interface.
1827 func (ctl *taskController) Save(ctx context.Context) error { 1827 func (ctl *taskController) Save(ctx context.Context) error {
1828 return ctl.saveImpl(ctx, true) 1828 return ctl.saveImpl(ctx, true)
1829 } 1829 }
1830 1830
1831 // errUpdateConflict means Invocation is being modified by two TaskController's 1831 // errUpdateConflict means Invocation is being modified by two TaskController's
1832 // concurrently. It should not be happening often. If it happens, task queue 1832 // concurrently. It should not be happening often. If it happens, task queue
1833 // call is retried to rerun the two-part transaction from scratch. 1833 // call is retried to rerun the two-part transaction from scratch.
1834 var errUpdateConflict = errors.WrapTransient(errors.New("concurrent modification s of single Invocation")) 1834 var errUpdateConflict = errors.New("concurrent modifications of single Invocatio n", transient.Tag)
1835 1835
1836 // saveImpl uploads updated Invocation to the datastore. If updateJob is true, 1836 // saveImpl uploads updated Invocation to the datastore. If updateJob is true,
1837 // it will also roll corresponding state machine forward. 1837 // it will also roll corresponding state machine forward.
1838 func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er ror) { 1838 func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er ror) {
1839 // Mutate copy in case transaction below fails. Also unpacks ctl.state b ack 1839 // Mutate copy in case transaction below fails. Also unpacks ctl.state b ack
1840 // into the entity (reverse of 'populateState'). 1840 // into the entity (reverse of 'populateState').
1841 saving := ctl.saved 1841 saving := ctl.saved
1842 saving.Status = ctl.state.Status 1842 saving.Status = ctl.state.Status
1843 saving.TaskData = append([]byte(nil), ctl.state.TaskData...) 1843 saving.TaskData = append([]byte(nil), ctl.state.TaskData...)
1844 saving.ViewURL = ctl.state.ViewURL 1844 saving.ViewURL = ctl.state.ViewURL
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
1880 // expect it to be. 1880 // expect it to be.
1881 mostRecent := Invocation{ 1881 mostRecent := Invocation{
1882 ID: saving.ID, 1882 ID: saving.ID,
1883 JobKey: saving.JobKey, 1883 JobKey: saving.JobKey,
1884 } 1884 }
1885 switch err := ds.Get(c, &mostRecent); { 1885 switch err := ds.Get(c, &mostRecent); {
1886 case err == ds.ErrNoSuchEntity: // should not happen 1886 case err == ds.ErrNoSuchEntity: // should not happen
1887 logging.Errorf(c, "Invocation is suddenly gone") 1887 logging.Errorf(c, "Invocation is suddenly gone")
1888 return errors.New("invocation is suddenly gone") 1888 return errors.New("invocation is suddenly gone")
1889 case err != nil: 1889 case err != nil:
1890 » » » return errors.WrapTransient(err) 1890 » » » return transient.Tag.Apply(err)
1891 } 1891 }
1892 1892
1893 // Make sure no one touched it while we were handling the invoca tion. 1893 // Make sure no one touched it while we were handling the invoca tion.
1894 if saving.MutationsCount != mostRecent.MutationsCount+1 { 1894 if saving.MutationsCount != mostRecent.MutationsCount+1 {
1895 logging.Errorf(c, "Invocation was modified by someone el se while we were handling it") 1895 logging.Errorf(c, "Invocation was modified by someone el se while we were handling it")
1896 return errUpdateConflict 1896 return errUpdateConflict
1897 } 1897 }
1898 1898
1899 // Store the invocation entity and schedule invocation timers re gardless of 1899 // Store the invocation entity and schedule invocation timers re gardless of
1900 // the current state of the Job entity. The table of all invocat ions is 1900 // the current state of the Job entity. The table of all invocat ions is
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
1937 } 1937 }
1938 if hasFinished { 1938 if hasFinished {
1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or { 1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err or {
1940 sm.OnInvocationDone(saving.ID) 1940 sm.OnInvocationDone(saving.ID)
1941 return nil 1941 return nil
1942 }) 1942 })
1943 } 1943 }
1944 return nil 1944 return nil
1945 }) 1945 })
1946 } 1946 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698