| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package engine implements the core logic of the scheduler service. | 5 // Package engine implements the core logic of the scheduler service. |
| 6 package engine | 6 package engine |
| 7 | 7 |
| 8 import ( | 8 import ( |
| 9 "bytes" | 9 "bytes" |
| 10 "encoding/json" | 10 "encoding/json" |
| (...skipping 14 matching lines...) Expand all Loading... |
| 25 ds "github.com/luci/gae/service/datastore" | 25 ds "github.com/luci/gae/service/datastore" |
| 26 "github.com/luci/gae/service/info" | 26 "github.com/luci/gae/service/info" |
| 27 mc "github.com/luci/gae/service/memcache" | 27 mc "github.com/luci/gae/service/memcache" |
| 28 tq "github.com/luci/gae/service/taskqueue" | 28 tq "github.com/luci/gae/service/taskqueue" |
| 29 | 29 |
| 30 "github.com/luci/luci-go/common/clock" | 30 "github.com/luci/luci-go/common/clock" |
| 31 "github.com/luci/luci-go/common/data/rand/mathrand" | 31 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 32 "github.com/luci/luci-go/common/data/stringset" | 32 "github.com/luci/luci-go/common/data/stringset" |
| 33 "github.com/luci/luci-go/common/errors" | 33 "github.com/luci/luci-go/common/errors" |
| 34 "github.com/luci/luci-go/common/logging" | 34 "github.com/luci/luci-go/common/logging" |
| 35 "github.com/luci/luci-go/common/retry" |
| 35 "github.com/luci/luci-go/server/auth" | 36 "github.com/luci/luci-go/server/auth" |
| 36 "github.com/luci/luci-go/server/auth/identity" | 37 "github.com/luci/luci-go/server/auth/identity" |
| 37 "github.com/luci/luci-go/server/auth/signing" | 38 "github.com/luci/luci-go/server/auth/signing" |
| 38 "github.com/luci/luci-go/server/tokens" | 39 "github.com/luci/luci-go/server/tokens" |
| 39 | 40 |
| 40 "github.com/luci/luci-go/scheduler/appengine/catalog" | 41 "github.com/luci/luci-go/scheduler/appengine/catalog" |
| 41 "github.com/luci/luci-go/scheduler/appengine/schedule" | 42 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 42 "github.com/luci/luci-go/scheduler/appengine/task" | 43 "github.com/luci/luci-go/scheduler/appengine/task" |
| 43 ) | 44 ) |
| 44 | 45 |
| (...skipping 509 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 554 // Check the global cache. | 555 // Check the global cache. |
| 555 switch _, err := mc.GetKey(c, key); { | 556 switch _, err := mc.GetKey(c, key); { |
| 556 case err == nil: | 557 case err == nil: |
| 557 e.lock.Lock() | 558 e.lock.Lock() |
| 558 defer e.lock.Unlock() | 559 defer e.lock.Unlock() |
| 559 e.doneFlags[key] = true | 560 e.doneFlags[key] = true |
| 560 return nil | 561 return nil |
| 561 case err == mc.ErrCacheMiss: | 562 case err == mc.ErrCacheMiss: |
| 562 break | 563 break |
| 563 default: | 564 default: |
| 564 » » return errors.WrapTransient(err) | 565 » » return retry.Tag.Apply(err) |
| 565 } | 566 } |
| 566 | 567 |
| 567 // Do it. | 568 // Do it. |
| 568 if err := cb(); err != nil { | 569 if err := cb(); err != nil { |
| 569 return err | 570 return err |
| 570 } | 571 } |
| 571 | 572 |
| 572 // Store in the global cache. Ignore errors, it's not a big deal. | 573 // Store in the global cache. Ignore errors, it's not a big deal. |
| 573 item := mc.NewItem(c, key) | 574 item := mc.NewItem(c, key) |
| 574 item.SetValue([]byte("ok")) | 575 item.SetValue([]byte("ok")) |
| 575 item.SetExpiration(24 * time.Hour) | 576 item.SetExpiration(24 * time.Hour) |
| 576 if err := mc.Set(c, item); err != nil { | 577 if err := mc.Set(c, item); err != nil { |
| 577 logging.Warningf(c, "Failed to write item to memcache - %s", err
) | 578 logging.Warningf(c, "Failed to write item to memcache - %s", err
) |
| 578 } | 579 } |
| 579 | 580 |
| 580 // Store in the local cache. | 581 // Store in the local cache. |
| 581 e.lock.Lock() | 582 e.lock.Lock() |
| 582 defer e.lock.Unlock() | 583 defer e.lock.Unlock() |
| 583 e.doneFlags[key] = true | 584 e.doneFlags[key] = true |
| 584 return nil | 585 return nil |
| 585 } | 586 } |
| 586 | 587 |
| 587 func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { | 588 func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { |
| 588 q := ds.NewQuery("Job"). | 589 q := ds.NewQuery("Job"). |
| 589 Eq("Enabled", true). | 590 Eq("Enabled", true). |
| 590 Project("ProjectID"). | 591 Project("ProjectID"). |
| 591 Distinct(true) | 592 Distinct(true) |
| 592 entities := []Job{} | 593 entities := []Job{} |
| 593 if err := ds.GetAll(c, q, &entities); err != nil { | 594 if err := ds.GetAll(c, q, &entities); err != nil { |
| 594 » » return nil, errors.WrapTransient(err) | 595 » » return nil, retry.Tag.Apply(err) |
| 595 } | 596 } |
| 596 // Filter out duplicates, sort. | 597 // Filter out duplicates, sort. |
| 597 projects := stringset.New(len(entities)) | 598 projects := stringset.New(len(entities)) |
| 598 for _, ent := range entities { | 599 for _, ent := range entities { |
| 599 projects.Add(ent.ProjectID) | 600 projects.Add(ent.ProjectID) |
| 600 } | 601 } |
| 601 out := projects.ToSlice() | 602 out := projects.ToSlice() |
| 602 sort.Strings(out) | 603 sort.Strings(out) |
| 603 return out, nil | 604 return out, nil |
| 604 } | 605 } |
| 605 | 606 |
| 606 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { | 607 func (e *engineImpl) GetAllJobs(c context.Context) ([]*Job, error) { |
| 607 q := ds.NewQuery("Job").Eq("Enabled", true) | 608 q := ds.NewQuery("Job").Eq("Enabled", true) |
| 608 return e.queryEnabledJobs(c, q) | 609 return e.queryEnabledJobs(c, q) |
| 609 } | 610 } |
| 610 | 611 |
| 611 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
, error) { | 612 func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
, error) { |
| 612 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) | 613 q := ds.NewQuery("Job").Eq("Enabled", true).Eq("ProjectID", projectID) |
| 613 return e.queryEnabledJobs(c, q) | 614 return e.queryEnabledJobs(c, q) |
| 614 } | 615 } |
| 615 | 616 |
| 616 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
rror) { | 617 func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, e
rror) { |
| 617 entities := []*Job{} | 618 entities := []*Job{} |
| 618 if err := ds.GetAll(c, q, &entities); err != nil { | 619 if err := ds.GetAll(c, q, &entities); err != nil { |
| 619 » » return nil, errors.WrapTransient(err) | 620 » » return nil, retry.Tag.Apply(err) |
| 620 } | 621 } |
| 621 // Non-ancestor query used, need to recheck filters. | 622 // Non-ancestor query used, need to recheck filters. |
| 622 filtered := make([]*Job, 0, len(entities)) | 623 filtered := make([]*Job, 0, len(entities)) |
| 623 for _, job := range entities { | 624 for _, job := range entities { |
| 624 if job.Enabled { | 625 if job.Enabled { |
| 625 filtered = append(filtered, job) | 626 filtered = append(filtered, job) |
| 626 } | 627 } |
| 627 } | 628 } |
| 628 return filtered, nil | 629 return filtered, nil |
| 629 } | 630 } |
| 630 | 631 |
| 631 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { | 632 func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { |
| 632 job := &Job{JobID: jobID} | 633 job := &Job{JobID: jobID} |
| 633 switch err := ds.Get(c, job); { | 634 switch err := ds.Get(c, job); { |
| 634 case err == nil: | 635 case err == nil: |
| 635 return job, nil | 636 return job, nil |
| 636 case err == ds.ErrNoSuchEntity: | 637 case err == ds.ErrNoSuchEntity: |
| 637 return nil, nil | 638 return nil, nil |
| 638 default: | 639 default: |
| 639 » » return nil, errors.WrapTransient(err) | 640 » » return nil, retry.Tag.Apply(err) |
| 640 } | 641 } |
| 641 } | 642 } |
| 642 | 643 |
| 643 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
nt, cursor string) ([]*Invocation, string, error) { | 644 func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
nt, cursor string) ([]*Invocation, string, error) { |
| 644 if pageSize == 0 || pageSize > 500 { | 645 if pageSize == 0 || pageSize > 500 { |
| 645 pageSize = 500 | 646 pageSize = 500 |
| 646 } | 647 } |
| 647 | 648 |
| 648 // Deserialize the cursor. | 649 // Deserialize the cursor. |
| 649 var cursorObj ds.Cursor | 650 var cursorObj ds.Cursor |
| (...skipping 23 matching lines...) Expand all Loading... |
| 673 return nil | 674 return nil |
| 674 } | 675 } |
| 675 c, err := getCursor() | 676 c, err := getCursor() |
| 676 if err != nil { | 677 if err != nil { |
| 677 return err | 678 return err |
| 678 } | 679 } |
| 679 newCursor = c.String() | 680 newCursor = c.String() |
| 680 return ds.Stop | 681 return ds.Stop |
| 681 }) | 682 }) |
| 682 if err != nil { | 683 if err != nil { |
| 683 » » return nil, "", errors.WrapTransient(err) | 684 » » return nil, "", retry.Tag.Apply(err) |
| 684 } | 685 } |
| 685 return out, newCursor, nil | 686 return out, newCursor, nil |
| 686 } | 687 } |
| 687 | 688 |
| 688 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
(*Invocation, error) { | 689 func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
(*Invocation, error) { |
| 689 inv := &Invocation{ | 690 inv := &Invocation{ |
| 690 ID: invID, | 691 ID: invID, |
| 691 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), | 692 JobKey: ds.NewKey(c, "Job", jobID, 0, nil), |
| 692 } | 693 } |
| 693 switch err := ds.Get(c, inv); { | 694 switch err := ds.Get(c, inv); { |
| 694 case err == nil: | 695 case err == nil: |
| 695 return inv, nil | 696 return inv, nil |
| 696 case err == ds.ErrNoSuchEntity: | 697 case err == ds.ErrNoSuchEntity: |
| 697 return nil, nil | 698 return nil, nil |
| 698 default: | 699 default: |
| 699 » » return nil, errors.WrapTransient(err) | 700 » » return nil, retry.Tag.Apply(err) |
| 700 } | 701 } |
| 701 } | 702 } |
| 702 | 703 |
| 703 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
]*Invocation, error) { | 704 func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
]*Invocation, error) { |
| 704 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) | 705 q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) |
| 705 entities := []*Invocation{} | 706 entities := []*Invocation{} |
| 706 if err := ds.GetAll(c, q, &entities); err != nil { | 707 if err := ds.GetAll(c, q, &entities); err != nil { |
| 707 » » return nil, errors.WrapTransient(err) | 708 » » return nil, retry.Tag.Apply(err) |
| 708 } | 709 } |
| 709 return entities, nil | 710 return entities, nil |
| 710 } | 711 } |
| 711 | 712 |
| 712 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
[]catalog.Definition) error { | 713 func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
[]catalog.Definition) error { |
| 713 // JobID -> *Job map. | 714 // JobID -> *Job map. |
| 714 existing, err := e.getProjectJobs(c, projectID) | 715 existing, err := e.getProjectJobs(c, projectID) |
| 715 if err != nil { | 716 if err != nil { |
| 716 return err | 717 return err |
| 717 } | 718 } |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 752 go func(i int, jobID string) { | 753 go func(i int, jobID string) { |
| 753 disableErrs.Assign(i, e.disableJob(c, jobID)) | 754 disableErrs.Assign(i, e.disableJob(c, jobID)) |
| 754 wg.Done() | 755 wg.Done() |
| 755 }(i, jobID) | 756 }(i, jobID) |
| 756 } | 757 } |
| 757 | 758 |
| 758 wg.Wait() | 759 wg.Wait() |
| 759 if updateErrs.Get() == nil && disableErrs.Get() == nil { | 760 if updateErrs.Get() == nil && disableErrs.Get() == nil { |
| 760 return nil | 761 return nil |
| 761 } | 762 } |
| 762 » return errors.WrapTransient(errors.NewMultiError(updateErrs.Get(), disab
leErrs.Get())) | 763 » return retry.Tag.Apply(errors.NewMultiError(updateErrs.Get(), disableErr
s.Get())) |
| 763 } | 764 } |
| 764 | 765 |
| 765 func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { | 766 func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { |
| 766 if !info.IsDevAppServer(c) { | 767 if !info.IsDevAppServer(c) { |
| 767 return errors.New("ResetAllJobsOnDevServer must not be used in p
roduction") | 768 return errors.New("ResetAllJobsOnDevServer must not be used in p
roduction") |
| 768 } | 769 } |
| 769 q := ds.NewQuery("Job").Eq("Enabled", true) | 770 q := ds.NewQuery("Job").Eq("Enabled", true) |
| 770 keys := []*ds.Key{} | 771 keys := []*ds.Key{} |
| 771 if err := ds.GetAll(c, q, &keys); err != nil { | 772 if err := ds.GetAll(c, q, &keys); err != nil { |
| 772 » » return errors.WrapTransient(err) | 773 » » return retry.Tag.Apply(err) |
| 773 } | 774 } |
| 774 wg := sync.WaitGroup{} | 775 wg := sync.WaitGroup{} |
| 775 errs := errors.NewLazyMultiError(len(keys)) | 776 errs := errors.NewLazyMultiError(len(keys)) |
| 776 for i, key := range keys { | 777 for i, key := range keys { |
| 777 wg.Add(1) | 778 wg.Add(1) |
| 778 go func(i int, key *ds.Key) { | 779 go func(i int, key *ds.Key) { |
| 779 errs.Assign(i, e.resetJobOnDevServer(c, key.StringID())) | 780 errs.Assign(i, e.resetJobOnDevServer(c, key.StringID())) |
| 780 wg.Done() | 781 wg.Done() |
| 781 }(i, key) | 782 }(i, key) |
| 782 } | 783 } |
| 783 wg.Wait() | 784 wg.Wait() |
| 784 » return errors.WrapTransient(errs.Get()) | 785 » return retry.Tag.Apply(errs.Get()) |
| 785 } | 786 } |
| 786 | 787 |
| 787 // getProjectJobs fetches from ds all enabled jobs belonging to a given | 788 // getProjectJobs fetches from ds all enabled jobs belonging to a given |
| 788 // project. | 789 // project. |
| 789 func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st
ring]*Job, error) { | 790 func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st
ring]*Job, error) { |
| 790 q := ds.NewQuery("Job"). | 791 q := ds.NewQuery("Job"). |
| 791 Eq("Enabled", true). | 792 Eq("Enabled", true). |
| 792 Eq("ProjectID", projectID) | 793 Eq("ProjectID", projectID) |
| 793 entities := []*Job{} | 794 entities := []*Job{} |
| 794 if err := ds.GetAll(c, q, &entities); err != nil { | 795 if err := ds.GetAll(c, q, &entities); err != nil { |
| 795 » » return nil, errors.WrapTransient(err) | 796 » » return nil, retry.Tag.Apply(err) |
| 796 } | 797 } |
| 797 out := make(map[string]*Job, len(entities)) | 798 out := make(map[string]*Job, len(entities)) |
| 798 for _, job := range entities { | 799 for _, job := range entities { |
| 799 if job.Enabled && job.ProjectID == projectID { | 800 if job.Enabled && job.ProjectID == projectID { |
| 800 out[job.JobID] = job | 801 out[job.JobID] = job |
| 801 } | 802 } |
| 802 } | 803 } |
| 803 return out, nil | 804 return out, nil |
| 804 } | 805 } |
| 805 | 806 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 831 logging.Warningf(c, "Retrying transaction...") | 832 logging.Warningf(c, "Retrying transaction...") |
| 832 } | 833 } |
| 833 stored := Job{JobID: jobID} | 834 stored := Job{JobID: jobID} |
| 834 err := ds.Get(c, &stored) | 835 err := ds.Get(c, &stored) |
| 835 if err != nil && err != ds.ErrNoSuchEntity { | 836 if err != nil && err != ds.ErrNoSuchEntity { |
| 836 return err | 837 return err |
| 837 } | 838 } |
| 838 modified := stored | 839 modified := stored |
| 839 err = txn(c, &modified, err == ds.ErrNoSuchEntity) | 840 err = txn(c, &modified, err == ds.ErrNoSuchEntity) |
| 840 if err != nil && err != errSkipPut { | 841 if err != nil && err != errSkipPut { |
| 841 » » » fatal = !errors.IsTransient(err) | 842 » » » fatal = !retry.Tag.In(err) |
| 842 return err | 843 return err |
| 843 } | 844 } |
| 844 if err != errSkipPut && !modified.isEqual(&stored) { | 845 if err != errSkipPut && !modified.isEqual(&stored) { |
| 845 return ds.Put(c, &modified) | 846 return ds.Put(c, &modified) |
| 846 } | 847 } |
| 847 return nil | 848 return nil |
| 848 }, &defaultTransactionOptions) | 849 }, &defaultTransactionOptions) |
| 849 if err != nil { | 850 if err != nil { |
| 850 logging.Errorf(c, "Job transaction failed: %s", err) | 851 logging.Errorf(c, "Job transaction failed: %s", err) |
| 851 if fatal { | 852 if fatal { |
| 852 return err | 853 return err |
| 853 } | 854 } |
| 854 // By now err is already transient (since 'fatal' is false) or i
t is commit | 855 // By now err is already transient (since 'fatal' is false) or i
t is commit |
| 855 // error (i.e. produced by RunInTransaction itself, not by its c
allback). | 856 // error (i.e. produced by RunInTransaction itself, not by its c
allback). |
| 856 // Need to wrap commit errors too. | 857 // Need to wrap commit errors too. |
| 857 » » return errors.WrapTransient(err) | 858 » » return retry.Tag.Apply(err) |
| 858 } | 859 } |
| 859 if attempt > 1 { | 860 if attempt > 1 { |
| 860 logging.Infof(c, "Committed on %d attempt", attempt) | 861 logging.Infof(c, "Committed on %d attempt", attempt) |
| 861 } | 862 } |
| 862 return nil | 863 return nil |
| 863 } | 864 } |
| 864 | 865 |
| 865 // rollSM is called under transaction to perform a single state machine | 866 // rollSM is called under transaction to perform a single state machine |
| 866 // transition. It sets up StateMachine instance, calls the callback, mutates | 867 // transition. It sets up StateMachine instance, calls the callback, mutates |
| 867 // job.State in place (with a new state) and enqueues all emitted actions to | 868 // job.State in place (with a new state) and enqueues all emitted actions to |
| 868 // task queues. | 869 // task queues. |
| 869 func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine)
error) error { | 870 func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine)
error) error { |
| 870 sched, err := job.parseSchedule() | 871 sched, err := job.parseSchedule() |
| 871 if err != nil { | 872 if err != nil { |
| 872 return fmt.Errorf("bad schedule %q - %s", job.effectiveSchedule(
), err) | 873 return fmt.Errorf("bad schedule %q - %s", job.effectiveSchedule(
), err) |
| 873 } | 874 } |
| 874 now := clock.Now(c).UTC() | 875 now := clock.Now(c).UTC() |
| 875 rnd := mathrand.Get(c) | 876 rnd := mathrand.Get(c) |
| 876 sm := StateMachine{ | 877 sm := StateMachine{ |
| 877 State: job.State, | 878 State: job.State, |
| 878 Now: now, | 879 Now: now, |
| 879 Schedule: sched, | 880 Schedule: sched, |
| 880 Nonce: func() int64 { return rnd.Int63() + 1 }, | 881 Nonce: func() int64 { return rnd.Int63() + 1 }, |
| 881 Context: c, | 882 Context: c, |
| 882 } | 883 } |
| 883 // All errors returned by state machine transition changes are transient
. | 884 // All errors returned by state machine transition changes are transient
. |
| 884 // Fatal errors (when we have them) should be reflected as a state chang
ing | 885 // Fatal errors (when we have them) should be reflected as a state chang
ing |
| 885 // into "BROKEN" state. | 886 // into "BROKEN" state. |
| 886 if err := cb(&sm); err != nil { | 887 if err := cb(&sm); err != nil { |
| 887 » » return errors.WrapTransient(err) | 888 » » return retry.Tag.Apply(err) |
| 888 } | 889 } |
| 889 if len(sm.Actions) != 0 { | 890 if len(sm.Actions) != 0 { |
| 890 if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err !=
nil { | 891 if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err !=
nil { |
| 891 return err | 892 return err |
| 892 } | 893 } |
| 893 } | 894 } |
| 894 if sm.State.State != job.State.State { | 895 if sm.State.State != job.State.State { |
| 895 logging.Infof(c, "%s -> %s", job.State.State, sm.State.State) | 896 logging.Infof(c, "%s -> %s", job.State.State, sm.State.State) |
| 896 } | 897 } |
| 897 job.State = sm.State | 898 job.State = sm.State |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 967 i := 0 | 968 i := 0 |
| 968 for queueName, tasks := range qs { | 969 for queueName, tasks := range qs { |
| 969 wg.Add(1) | 970 wg.Add(1) |
| 970 go func(i int, queueName string, tasks []*tq.Task) { | 971 go func(i int, queueName string, tasks []*tq.Task) { |
| 971 errs.Assign(i, tq.Add(c, queueName, tasks...)) | 972 errs.Assign(i, tq.Add(c, queueName, tasks...)) |
| 972 wg.Done() | 973 wg.Done() |
| 973 }(i, queueName, tasks) | 974 }(i, queueName, tasks) |
| 974 i++ | 975 i++ |
| 975 } | 976 } |
| 976 wg.Wait() | 977 wg.Wait() |
| 977 » return errors.WrapTransient(errs.Get()) | 978 » return retry.Tag.Apply(errs.Get()) |
| 978 } | 979 } |
| 979 | 980 |
| 980 // enqueueInvTimers submits all timers emitted by an invocation manager by | 981 // enqueueInvTimers submits all timers emitted by an invocation manager by |
| 981 // adding corresponding tasks to the task queue. See ExecuteSerializedAction for | 982 // adding corresponding tasks to the task queue. See ExecuteSerializedAction for |
| 982 // place where these actions are interpreted. | 983 // place where these actions are interpreted. |
| 983 func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers
[]invocationTimer) error { | 984 func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers
[]invocationTimer) error { |
| 984 tasks := make([]*tq.Task, len(timers)) | 985 tasks := make([]*tq.Task, len(timers)) |
| 985 for i, timer := range timers { | 986 for i, timer := range timers { |
| 986 payload, err := json.Marshal(actionTaskPayload{ | 987 payload, err := json.Marshal(actionTaskPayload{ |
| 987 JobID: inv.JobKey.StringID(), | 988 JobID: inv.JobKey.StringID(), |
| 988 InvID: inv.ID, | 989 InvID: inv.ID, |
| 989 InvTimer: &timer, | 990 InvTimer: &timer, |
| 990 }) | 991 }) |
| 991 if err != nil { | 992 if err != nil { |
| 992 return err | 993 return err |
| 993 } | 994 } |
| 994 tasks[i] = &tq.Task{ | 995 tasks[i] = &tq.Task{ |
| 995 Path: e.TimersQueuePath, | 996 Path: e.TimersQueuePath, |
| 996 ETA: clock.Now(c).Add(timer.Delay), | 997 ETA: clock.Now(c).Add(timer.Delay), |
| 997 Payload: payload, | 998 Payload: payload, |
| 998 } | 999 } |
| 999 } | 1000 } |
| 1000 » return errors.WrapTransient(tq.Add(c, e.TimersQueueName, tasks...)) | 1001 » return retry.Tag.Apply(tq.Add(c, e.TimersQueueName, tasks...)) |
| 1001 } | 1002 } |
| 1002 | 1003 |
| 1003 func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, r
etryCount int) error { | 1004 func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, r
etryCount int) error { |
| 1004 payload := actionTaskPayload{} | 1005 payload := actionTaskPayload{} |
| 1005 if err := json.Unmarshal(action, &payload); err != nil { | 1006 if err := json.Unmarshal(action, &payload); err != nil { |
| 1006 return err | 1007 return err |
| 1007 } | 1008 } |
| 1008 if payload.InvID == 0 { | 1009 if payload.InvID == 0 { |
| 1009 return e.executeJobAction(c, &payload, retryCount) | 1010 return e.executeJobAction(c, &payload, retryCount) |
| 1010 } | 1011 } |
| (...skipping 272 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1283 Started: now, | 1284 Started: now, |
| 1284 Finished: now, | 1285 Finished: now, |
| 1285 Status: task.StatusOverrun, | 1286 Status: task.StatusOverrun, |
| 1286 } | 1287 } |
| 1287 if runningInvID == 0 { | 1288 if runningInvID == 0 { |
| 1288 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still starting") | 1289 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still starting") |
| 1289 } else { | 1290 } else { |
| 1290 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still running: %d", runningInvID) | 1291 inv.debugLog(c, "New invocation should be starting now, but prev
ious one is still running: %d", runningInvID) |
| 1291 } | 1292 } |
| 1292 inv.debugLog(c, "Total overruns thus far: %d", overruns) | 1293 inv.debugLog(c, "Total overruns thus far: %d", overruns) |
| 1293 » return errors.WrapTransient(ds.Put(c, &inv)) | 1294 » return retry.Tag.Apply(ds.Put(c, &inv)) |
| 1294 } | 1295 } |
| 1295 | 1296 |
| 1296 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. | 1297 // invocationTimerTick is called via Task Queue to handle AddTimer callbacks. |
| 1297 // | 1298 // |
| 1298 // See also handlePubSubMessage, it is quite similar. | 1299 // See also handlePubSubMessage, it is quite similar. |
| 1299 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
int64, timer *invocationTimer) error { | 1300 func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
int64, timer *invocationTimer) error { |
| 1300 c = logging.SetField(c, "JobID", jobID) | 1301 c = logging.SetField(c, "JobID", jobID) |
| 1301 c = logging.SetField(c, "InvID", invID) | 1302 c = logging.SetField(c, "InvID", invID) |
| 1302 | 1303 |
| 1303 logging.Infof(c, "Handling invocation timer %q", timer.Name) | 1304 logging.Infof(c, "Handling invocation timer %q", timer.Name) |
| (...skipping 16 matching lines...) Expand all Loading... |
| 1320 ctl, err := e.controllerForInvocation(c, inv) | 1321 ctl, err := e.controllerForInvocation(c, inv) |
| 1321 if err != nil { | 1322 if err != nil { |
| 1322 logging.Errorf(c, "Cannot get controller - %s", err) | 1323 logging.Errorf(c, "Cannot get controller - %s", err) |
| 1323 return err | 1324 return err |
| 1324 } | 1325 } |
| 1325 | 1326 |
| 1326 // Hand the message to the TaskManager. | 1327 // Hand the message to the TaskManager. |
| 1327 err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload) | 1328 err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload) |
| 1328 if err != nil { | 1329 if err != nil { |
| 1329 logging.Errorf(c, "Error when handling the timer - %s", err) | 1330 logging.Errorf(c, "Error when handling the timer - %s", err) |
| 1330 » » if !errors.IsTransient(err) && ctl.State().Status != task.Status
Failed { | 1331 » » if !retry.Tag.In(err) && ctl.State().Status != task.StatusFailed
{ |
| 1331 ctl.DebugLog("Fatal error when handling timer, aborting
invocation - %s", err) | 1332 ctl.DebugLog("Fatal error when handling timer, aborting
invocation - %s", err) |
| 1332 ctl.State().Status = task.StatusFailed | 1333 ctl.State().Status = task.StatusFailed |
| 1333 } | 1334 } |
| 1334 } | 1335 } |
| 1335 | 1336 |
| 1336 // Save anyway, to preserve the invocation log. | 1337 // Save anyway, to preserve the invocation log. |
| 1337 saveErr := ctl.Save(c) | 1338 saveErr := ctl.Save(c) |
| 1338 if saveErr != nil { | 1339 if saveErr != nil { |
| 1339 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) | 1340 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) |
| 1340 } | 1341 } |
| 1341 | 1342 |
| 1342 // Retry the delivery if at least one error is transient. HandleTimer mu
st be | 1343 // Retry the delivery if at least one error is transient. HandleTimer mu
st be |
| 1343 // idempotent. | 1344 // idempotent. |
| 1344 switch { | 1345 switch { |
| 1345 case err == nil && saveErr == nil: | 1346 case err == nil && saveErr == nil: |
| 1346 return nil | 1347 return nil |
| 1347 » case errors.IsTransient(saveErr): | 1348 » case retry.Tag.In(err): |
| 1348 return saveErr | 1349 return saveErr |
| 1349 default: | 1350 default: |
| 1350 return err // transient or fatal | 1351 return err // transient or fatal |
| 1351 } | 1352 } |
| 1352 } | 1353 } |
| 1353 | 1354 |
| 1354 // startInvocation is called via task queue to start running a job. This call | 1355 // startInvocation is called via task queue to start running a job. This call |
| 1355 // may be retried by task queue service. | 1356 // may be retried by task queue service. |
| 1356 func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation
Nonce int64, | 1357 func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation
Nonce int64, |
| 1357 triggeredBy identity.Identity, retryCount int) error { | 1358 triggeredBy identity.Identity, retryCount int) error { |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1487 // In either case, invocation never ends up in StatusStarting state. | 1488 // In either case, invocation never ends up in StatusStarting state. |
| 1488 err = ctl.manager.LaunchTask(c, ctl) | 1489 err = ctl.manager.LaunchTask(c, ctl) |
| 1489 if ctl.State().Status == task.StatusStarting { | 1490 if ctl.State().Status == task.StatusStarting { |
| 1490 ctl.State().Status = task.StatusFailed | 1491 ctl.State().Status = task.StatusFailed |
| 1491 if err == nil { | 1492 if err == nil { |
| 1492 err = fmt.Errorf("LaunchTask didn't move invocation out
of StatusStarting") | 1493 err = fmt.Errorf("LaunchTask didn't move invocation out
of StatusStarting") |
| 1493 } | 1494 } |
| 1494 } | 1495 } |
| 1495 | 1496 |
| 1496 // Give up retrying on transient errors after some number of attempts. | 1497 // Give up retrying on transient errors after some number of attempts. |
| 1497 » if errors.IsTransient(err) && retryCount+1 >= invocationRetryLimit { | 1498 » if retry.Tag.In(err) && retryCount+1 >= invocationRetryLimit { |
| 1498 err = fmt.Errorf("Too many retries, giving up (original error -
%s)", err) | 1499 err = fmt.Errorf("Too many retries, giving up (original error -
%s)", err) |
| 1499 } | 1500 } |
| 1500 | 1501 |
| 1501 // If asked to retry the invocation (by returning a transient error), do
not | 1502 // If asked to retry the invocation (by returning a transient error), do
not |
| 1502 // touch Job entity when saving the current (failed) invocation. That wa
y Job | 1503 // touch Job entity when saving the current (failed) invocation. That wa
y Job |
| 1503 // stays in "QUEUED" state (indicating it's queued for a new invocation)
. | 1504 // stays in "QUEUED" state (indicating it's queued for a new invocation)
. |
| 1504 » retryInvocation := errors.IsTransient(err) | 1505 » if saveErr := ctl.saveImpl(c, !retry.Tag.In(err)); saveErr != nil { |
| 1505 » if saveErr := ctl.saveImpl(c, !retryInvocation); saveErr != nil { | |
| 1506 logging.Errorf(c, "Failed to save invocation state - %s", saveEr
r) | 1506 logging.Errorf(c, "Failed to save invocation state - %s", saveEr
r) |
| 1507 if err == nil { | 1507 if err == nil { |
| 1508 err = saveErr | 1508 err = saveErr |
| 1509 } | 1509 } |
| 1510 } | 1510 } |
| 1511 | 1511 |
| 1512 // Returning transient error here causes the task queue to retry the tas
k. | 1512 // Returning transient error here causes the task queue to retry the tas
k. |
| 1513 if err != nil { | 1513 if err != nil { |
| 1514 logging.WithError(err).Errorf(c, "Invocation failed to start") | 1514 logging.WithError(err).Errorf(c, "Invocation failed to start") |
| 1515 } | 1515 } |
| (...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1655 _, sub := e.genTopicAndSubNames(c, taskManagerName, publisher) | 1655 _, sub := e.genTopicAndSubNames(c, taskManagerName, publisher) |
| 1656 msg, ack, err := pullSubcription(c, sub, "") | 1656 msg, ack, err := pullSubcription(c, sub, "") |
| 1657 if err != nil { | 1657 if err != nil { |
| 1658 return err | 1658 return err |
| 1659 } | 1659 } |
| 1660 if msg == nil { | 1660 if msg == nil { |
| 1661 logging.Infof(c, "No new PubSub messages") | 1661 logging.Infof(c, "No new PubSub messages") |
| 1662 return nil | 1662 return nil |
| 1663 } | 1663 } |
| 1664 err = e.handlePubSubMessage(c, msg) | 1664 err = e.handlePubSubMessage(c, msg) |
| 1665 » if err == nil || !errors.IsTransient(err) { | 1665 » if err == nil || !retry.Tag.In(err) { |
| 1666 ack() // ack only on success of fatal errors (to stop redelivery
) | 1666 ack() // ack only on success of fatal errors (to stop redelivery
) |
| 1667 } | 1667 } |
| 1668 return err | 1668 return err |
| 1669 } | 1669 } |
| 1670 | 1670 |
| 1671 func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
ssage) error { | 1671 func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
ssage) error { |
| 1672 logging.Infof(c, "Received PubSub message %q", msg.MessageId) | 1672 logging.Infof(c, "Received PubSub message %q", msg.MessageId) |
| 1673 | 1673 |
| 1674 // Extract Job and Invocation ID from validated auth_token. | 1674 // Extract Job and Invocation ID from validated auth_token. |
| 1675 var jobID string | 1675 var jobID string |
| (...skipping 30 matching lines...) Expand all Loading... |
| 1706 ctl, err := e.controllerForInvocation(c, inv) | 1706 ctl, err := e.controllerForInvocation(c, inv) |
| 1707 if err != nil { | 1707 if err != nil { |
| 1708 logging.Errorf(c, "Cannot get controller - %s", err) | 1708 logging.Errorf(c, "Cannot get controller - %s", err) |
| 1709 return err | 1709 return err |
| 1710 } | 1710 } |
| 1711 | 1711 |
| 1712 // Hand the message to the TaskManager. | 1712 // Hand the message to the TaskManager. |
| 1713 err = ctl.manager.HandleNotification(c, ctl, msg) | 1713 err = ctl.manager.HandleNotification(c, ctl, msg) |
| 1714 if err != nil { | 1714 if err != nil { |
| 1715 logging.Errorf(c, "Error when handling the message - %s", err) | 1715 logging.Errorf(c, "Error when handling the message - %s", err) |
| 1716 » » if !errors.IsTransient(err) && ctl.State().Status != task.Status
Failed { | 1716 » » if !retry.Tag.In(err) && ctl.State().Status != task.StatusFailed
{ |
| 1717 ctl.DebugLog("Fatal error when handling PubSub notificat
ion, aborting invocation - %s", err) | 1717 ctl.DebugLog("Fatal error when handling PubSub notificat
ion, aborting invocation - %s", err) |
| 1718 ctl.State().Status = task.StatusFailed | 1718 ctl.State().Status = task.StatusFailed |
| 1719 } | 1719 } |
| 1720 } | 1720 } |
| 1721 | 1721 |
| 1722 // Save anyway, to preserve the invocation log. | 1722 // Save anyway, to preserve the invocation log. |
| 1723 saveErr := ctl.Save(c) | 1723 saveErr := ctl.Save(c) |
| 1724 if saveErr != nil { | 1724 if saveErr != nil { |
| 1725 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) | 1725 logging.Errorf(c, "Error when saving the invocation - %s", saveE
rr) |
| 1726 } | 1726 } |
| 1727 | 1727 |
| 1728 // Retry the delivery if at least one error is transient. HandleNotifica
tion | 1728 // Retry the delivery if at least one error is transient. HandleNotifica
tion |
| 1729 // must be idempotent. | 1729 // must be idempotent. |
| 1730 switch { | 1730 switch { |
| 1731 case err == nil && saveErr == nil: | 1731 case err == nil && saveErr == nil: |
| 1732 return nil | 1732 return nil |
| 1733 » case errors.IsTransient(saveErr): | 1733 » case retry.Tag.In(saveErr): |
| 1734 return saveErr | 1734 return saveErr |
| 1735 default: | 1735 default: |
| 1736 return err // transient or fatal | 1736 return err // transient or fatal |
| 1737 } | 1737 } |
| 1738 } | 1738 } |
| 1739 | 1739 |
| 1740 //////////////////////////////////////////////////////////////////////////////// | 1740 //////////////////////////////////////////////////////////////////////////////// |
| 1741 // TaskController. | 1741 // TaskController. |
| 1742 | 1742 |
| 1743 type taskController struct { | 1743 type taskController struct { |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1824 } | 1824 } |
| 1825 | 1825 |
| 1826 // Save is part of task.Controller interface. | 1826 // Save is part of task.Controller interface. |
| 1827 func (ctl *taskController) Save(ctx context.Context) error { | 1827 func (ctl *taskController) Save(ctx context.Context) error { |
| 1828 return ctl.saveImpl(ctx, true) | 1828 return ctl.saveImpl(ctx, true) |
| 1829 } | 1829 } |
| 1830 | 1830 |
| 1831 // errUpdateConflict means Invocation is being modified by two TaskController's | 1831 // errUpdateConflict means Invocation is being modified by two TaskController's |
| 1832 // concurrently. It should not be happening often. If it happens, task queue | 1832 // concurrently. It should not be happening often. If it happens, task queue |
| 1833 // call is retried to rerun the two-part transaction from scratch. | 1833 // call is retried to rerun the two-part transaction from scratch. |
| 1834 var errUpdateConflict = errors.WrapTransient(errors.New("concurrent modification
s of single Invocation")) | 1834 var errUpdateConflict = errors.New("concurrent modifications of single Invocatio
n", retry.Tag) |
| 1835 | 1835 |
| 1836 // saveImpl uploads updated Invocation to the datastore. If updateJob is true, | 1836 // saveImpl uploads updated Invocation to the datastore. If updateJob is true, |
| 1837 // it will also roll corresponding state machine forward. | 1837 // it will also roll corresponding state machine forward. |
| 1838 func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er
ror) { | 1838 func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er
ror) { |
| 1839 // Mutate copy in case transaction below fails. Also unpacks ctl.state b
ack | 1839 // Mutate copy in case transaction below fails. Also unpacks ctl.state b
ack |
| 1840 // into the entity (reverse of 'populateState'). | 1840 // into the entity (reverse of 'populateState'). |
| 1841 saving := ctl.saved | 1841 saving := ctl.saved |
| 1842 saving.Status = ctl.state.Status | 1842 saving.Status = ctl.state.Status |
| 1843 saving.TaskData = append([]byte(nil), ctl.state.TaskData...) | 1843 saving.TaskData = append([]byte(nil), ctl.state.TaskData...) |
| 1844 saving.ViewURL = ctl.state.ViewURL | 1844 saving.ViewURL = ctl.state.ViewURL |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1880 // expect it to be. | 1880 // expect it to be. |
| 1881 mostRecent := Invocation{ | 1881 mostRecent := Invocation{ |
| 1882 ID: saving.ID, | 1882 ID: saving.ID, |
| 1883 JobKey: saving.JobKey, | 1883 JobKey: saving.JobKey, |
| 1884 } | 1884 } |
| 1885 switch err := ds.Get(c, &mostRecent); { | 1885 switch err := ds.Get(c, &mostRecent); { |
| 1886 case err == ds.ErrNoSuchEntity: // should not happen | 1886 case err == ds.ErrNoSuchEntity: // should not happen |
| 1887 logging.Errorf(c, "Invocation is suddenly gone") | 1887 logging.Errorf(c, "Invocation is suddenly gone") |
| 1888 return errors.New("invocation is suddenly gone") | 1888 return errors.New("invocation is suddenly gone") |
| 1889 case err != nil: | 1889 case err != nil: |
| 1890 » » » return errors.WrapTransient(err) | 1890 » » » return retry.Tag.Apply(err) |
| 1891 } | 1891 } |
| 1892 | 1892 |
| 1893 // Make sure no one touched it while we were handling the invoca
tion. | 1893 // Make sure no one touched it while we were handling the invoca
tion. |
| 1894 if saving.MutationsCount != mostRecent.MutationsCount+1 { | 1894 if saving.MutationsCount != mostRecent.MutationsCount+1 { |
| 1895 logging.Errorf(c, "Invocation was modified by someone el
se while we were handling it") | 1895 logging.Errorf(c, "Invocation was modified by someone el
se while we were handling it") |
| 1896 return errUpdateConflict | 1896 return errUpdateConflict |
| 1897 } | 1897 } |
| 1898 | 1898 |
| 1899 // Store the invocation entity and schedule invocation timers re
gardless of | 1899 // Store the invocation entity and schedule invocation timers re
gardless of |
| 1900 // the current state of the Job entity. The table of all invocat
ions is | 1900 // the current state of the Job entity. The table of all invocat
ions is |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1937 } | 1937 } |
| 1938 if hasFinished { | 1938 if hasFinished { |
| 1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { | 1939 return ctl.eng.rollSM(c, job, func(sm *StateMachine) err
or { |
| 1940 sm.OnInvocationDone(saving.ID) | 1940 sm.OnInvocationDone(saving.ID) |
| 1941 return nil | 1941 return nil |
| 1942 }) | 1942 }) |
| 1943 } | 1943 } |
| 1944 return nil | 1944 return nil |
| 1945 }) | 1945 }) |
| 1946 } | 1946 } |
| OLD | NEW |