| Index: build_scheduler/go/db/local_db/local_db.go
|
| diff --git a/build_scheduler/go/db/local_db/local_db.go b/build_scheduler/go/db/local_db/local_db.go
|
| deleted file mode 100644
|
| index 97e351a86c43c31925fb4bc79c70020e71f36195..0000000000000000000000000000000000000000
|
| --- a/build_scheduler/go/db/local_db/local_db.go
|
| +++ /dev/null
|
| @@ -1,529 +0,0 @@
|
| -package local_db
|
| -
|
| -import (
|
| - "bytes"
|
| - "encoding/binary"
|
| - "encoding/gob"
|
| - "fmt"
|
| - "net/http"
|
| - "sort"
|
| - "strconv"
|
| - "strings"
|
| - "sync"
|
| - "time"
|
| -
|
| - "github.com/boltdb/bolt"
|
| - "github.com/gorilla/mux"
|
| - "github.com/skia-dev/glog"
|
| - "go.skia.org/infra/build_scheduler/go/db"
|
| - "go.skia.org/infra/go/boltutil"
|
| - "go.skia.org/infra/go/httputils"
|
| - "go.skia.org/infra/go/metrics2"
|
| - "go.skia.org/infra/go/util"
|
| -)
|
| -
|
| -const (
|
| - // BUCKET_TASKS is the name of the Tasks bucket. Key is Task.Id, which is set
|
| - // to (creation time, sequence number) (see formatId for detail), value is
|
| - // described in docs for BUCKET_TASKS_VERSION. Tasks will be updated in place.
|
| - // All repos share the same bucket.
|
| - BUCKET_TASKS = "tasks"
|
| - // BUCKET_TASKS_FILL_PERCENT is the value to set for bolt.Bucket.FillPercent
|
| - // for BUCKET_TASKS. BUCKET_TASKS will be append-mostly, so use a high fill
|
| - // percent.
|
| - BUCKET_TASKS_FILL_PERCENT = 0.9
|
| - // BUCKET_TASKS_VERSION indicates the format of the value of BUCKET_TASKS
|
| - // written by PutTasks. Retrieving Tasks from the DB must support all previous
|
| - // versions. For all versions, the first byte is the version number.
|
| - // Version 1: v[0] = 1; v[1:9] is the modified time as UnixNano encoded as
|
| - // big endian; v[9:] is the GOB of the Task.
|
| - BUCKET_TASKS_VERSION = 1
|
| -
|
| - // TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Parse to
|
| - // format/parse the timestamp in the Task ID. It is similar to
|
| - // util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we omit
|
| - // most of the punctuation. This timestamp can only be used to format and
|
| - // parse times in UTC.
|
| - TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
|
| - // SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or
|
| - // fmt.Sscanf to format/parse the sequence number in the Task ID. It is a
|
| - // 16-digit zero-padded lowercase hexidecimal number.
|
| - SEQUENCE_NUMBER_FORMAT = "%016x"
|
| -
|
| - // MAX_CREATED_TIME_SKEW is the maximum difference between the timestamp in a
|
| - // Task's Id field and that Task's Created field. This allows AssignId to be
|
| - // called before creating the Swarming task so that the Id can be included in
|
| - // the Swarming task tags. GetTasksFromDateRange accounts for this skew when
|
| - // retrieving tasks. This value can be increased in the future, but can never
|
| - // be decreased.
|
| - //
|
| - // 6 minutes is based on httputils.DIAL_TIMEOUT + httputils.REQUEST_TIMEOUT,
|
| - // which is assumed to be the approximate maximum duration of a successful
|
| - // swarming.ApiClient.TriggerTask() call.
|
| - MAX_CREATED_TIME_SKEW = 6 * time.Minute
|
| -)
|
| -
|
| -// formatId returns the timestamp and sequence number formatted for a Task ID.
|
| -// Format is "<timestamp>_<sequence_num>", where the timestamp is formatted
|
| -// using TIMESTAMP_FORMAT and sequence_num is formatted using
|
| -// SEQUENCE_NUMBER_FORMAT.
|
| -func formatId(t time.Time, seq uint64) string {
|
| - t = t.UTC()
|
| - return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORMAT), seq)
|
| -}
|
| -
|
| -// parseId returns the timestamp and sequence number stored in a Task ID.
|
| -func parseId(id string) (time.Time, uint64, error) {
|
| - parts := strings.Split(id, "_")
|
| - if len(parts) != 2 {
|
| - return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
|
| - }
|
| - t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
|
| - if err != nil {
|
| - return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
|
| - }
|
| - var seq uint64
|
| - // Add newlines to force Sscanf to match the entire string. Otherwise
|
| - // "123hello" will be parsed as 123. Note that Sscanf does not require 16
|
| - // digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 digits.
|
| - i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq)
|
| - if err != nil {
|
| - return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
|
| - } else if i != 1 {
|
| - return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected one hex number in %s, got %d", id, parts[1], i)
|
| - }
|
| - return t, seq, nil
|
| -}
|
| -
|
| -// packV1 creates a value as described for BUCKET_TASKS_VERSION = 1. t is the
|
| -// modified time and serialized is the GOB of the Task.
|
| -func packV1(t time.Time, serialized []byte) []byte {
|
| - rv := make([]byte, len(serialized)+9)
|
| - rv[0] = 1
|
| - binary.BigEndian.PutUint64(rv[1:9], uint64(t.UnixNano()))
|
| - copy(rv[9:], serialized)
|
| - return rv
|
| -}
|
| -
|
| -// unpackV1 gets the modified time and GOB of the Task from a value as described
|
| -// by BUCKET_TASKS_VERSION = 1. The returned GOB shares structure with value.
|
| -func unpackV1(value []byte) (time.Time, []byte, error) {
|
| - if len(value) < 9 {
|
| - return time.Time{}, nil, fmt.Errorf("unpackV1 value is too short (%d bytes)", len(value))
|
| - }
|
| - if value[0] != 1 {
|
| - return time.Time{}, nil, fmt.Errorf("unpackV1 called for value with version %d", value[0])
|
| - }
|
| - t := time.Unix(0, int64(binary.BigEndian.Uint64(value[1:9]))).UTC()
|
| - return t, value[9:], nil
|
| -}
|
| -
|
| -// localDB accesses a local BoltDB database containing tasks.
|
| -type localDB struct {
|
| - // name is used in logging and metrics to identify this DB.
|
| - name string
|
| -
|
| - // db is the underlying BoltDB.
|
| - db *bolt.DB
|
| -
|
| - // tx fields contain metrics on the number of active transactions. Protected
|
| - // by txMutex.
|
| - txCount *metrics2.Counter
|
| - txNextId int64
|
| - txActive map[int64]string
|
| - txMutex sync.RWMutex
|
| -
|
| - dbMetric *boltutil.DbMetric
|
| -
|
| - modTasks db.ModifiedTasks
|
| -
|
| - // Close will send on each of these channels to indicate goroutines should
|
| - // stop.
|
| - notifyOnClose []chan bool
|
| -}
|
| -
|
| -// startTx monitors when a transaction starts.
|
| -func (d *localDB) startTx(name string) int64 {
|
| - d.txMutex.Lock()
|
| - defer d.txMutex.Unlock()
|
| - d.txCount.Inc(1)
|
| - id := d.txNextId
|
| - d.txActive[id] = name
|
| - d.txNextId++
|
| - return id
|
| -}
|
| -
|
| -// endTx monitors when a transaction ends.
|
| -func (d *localDB) endTx(id int64) {
|
| - d.txMutex.Lock()
|
| - defer d.txMutex.Unlock()
|
| - d.txCount.Dec(1)
|
| - delete(d.txActive, id)
|
| -}
|
| -
|
| -// reportActiveTx prints out the list of active transactions.
|
| -func (d *localDB) reportActiveTx() {
|
| - d.txMutex.RLock()
|
| - defer d.txMutex.RUnlock()
|
| - if len(d.txActive) == 0 {
|
| - glog.Infof("%s Active Transactions: (none)", d.name)
|
| - return
|
| - }
|
| - txs := make([]string, 0, len(d.txActive))
|
| - for id, name := range d.txActive {
|
| - txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
|
| - }
|
| - glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n"))
|
| -}
|
| -
|
| -// tx is a wrapper for a BoltDB transaction which tracks statistics.
|
| -func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
|
| - txId := d.startTx(name)
|
| - defer d.endTx(txId)
|
| - defer metrics2.NewTimer("db-tx-duration", map[string]string{
|
| - "database": d.name,
|
| - "transaction": name,
|
| - }).Stop()
|
| - if update {
|
| - return d.db.Update(fn)
|
| - } else {
|
| - return d.db.View(fn)
|
| - }
|
| -}
|
| -
|
| -// view is a wrapper for the BoltDB instance's View method.
|
| -func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
|
| - return d.tx(name, fn, false)
|
| -}
|
| -
|
| -// update is a wrapper for the BoltDB instance's Update method.
|
| -func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
|
| - return d.tx(name, fn, true)
|
| -}
|
| -
|
| -// Returns the tasks bucket with FillPercent set.
|
| -func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
|
| - b := tx.Bucket([]byte(BUCKET_TASKS))
|
| - b.FillPercent = BUCKET_TASKS_FILL_PERCENT
|
| - return b
|
| -}
|
| -
|
| -// NewDB returns a local DB instance.
|
| -func NewDB(name, filename string) (db.DB, error) {
|
| - boltdb, err := bolt.Open(filename, 0600, nil)
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - d := &localDB{
|
| - name: name,
|
| - db: boltdb,
|
| - txCount: metrics2.GetCounter("db-active-tx", map[string]string{
|
| - "database": name,
|
| - }),
|
| - txNextId: 0,
|
| - txActive: map[int64]string{},
|
| - }
|
| -
|
| - stopReportActiveTx := make(chan bool)
|
| - d.notifyOnClose = append(d.notifyOnClose, stopReportActiveTx)
|
| - go func() {
|
| - t := time.NewTicker(time.Minute)
|
| - for {
|
| - select {
|
| - case <-stopReportActiveTx:
|
| - t.Stop()
|
| - return
|
| - case <-t.C:
|
| - d.reportActiveTx()
|
| - }
|
| - }
|
| - }()
|
| -
|
| - if err := d.update("NewDB", func(tx *bolt.Tx) error {
|
| - if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); err != nil {
|
| - return err
|
| - }
|
| - return nil
|
| - }); err != nil {
|
| - return nil, err
|
| - }
|
| -
|
| - if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS}, map[string]string{"database": name}); err != nil {
|
| - return nil, err
|
| - } else {
|
| - d.dbMetric = dbMetric
|
| - }
|
| -
|
| - return d, nil
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) Close() error {
|
| - d.txMutex.Lock()
|
| - defer d.txMutex.Unlock()
|
| - if len(d.txActive) > 0 {
|
| - return fmt.Errorf("Can not close DB when transactions are active.")
|
| - }
|
| - for _, c := range d.notifyOnClose {
|
| - c <- true
|
| - }
|
| - d.txActive = map[int64]string{}
|
| - if err := d.dbMetric.Delete(); err != nil {
|
| - return err
|
| - }
|
| - d.dbMetric = nil
|
| - if err := d.txCount.Delete(); err != nil {
|
| - return err
|
| - }
|
| - d.txCount = nil
|
| - return d.db.Close()
|
| -}
|
| -
|
| -// Sets t.Id either based on t.Created or now. tx must be an update transaction.
|
| -func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error {
|
| - if t.Id != "" {
|
| - return fmt.Errorf("Task Id already assigned: %v", t.Id)
|
| - }
|
| - ts := now
|
| - if !util.TimeIsZero(t.Created) {
|
| - // TODO(benjaminwagner): Disallow assigning IDs based on t.Created; or
|
| - // ensure t.Created is > any ID ts in the DB.
|
| - ts = t.Created
|
| - }
|
| - seq, err := tasksBucket(tx).NextSequence()
|
| - if err != nil {
|
| - return err
|
| - }
|
| - t.Id = formatId(ts, seq)
|
| - return nil
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) AssignId(t *db.Task) error {
|
| - oldId := t.Id
|
| - err := d.update("AssignId", func(tx *bolt.Tx) error {
|
| - return d.assignId(tx, t, time.Now())
|
| - })
|
| - if err != nil {
|
| - t.Id = oldId
|
| - }
|
| - return err
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) GetTaskById(id string) (*db.Task, error) {
|
| - var rv *db.Task
|
| - if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
|
| - value := tasksBucket(tx).Get([]byte(id))
|
| - if value == nil {
|
| - return nil
|
| - }
|
| - // Only BUCKET_TASKS_VERSION = 1 is implemented right now.
|
| - // TODO(benjaminwagner): Add functions "pack" and "unpack" that determine
|
| - // which version to use.
|
| - _, serialized, err := unpackV1(value)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - var t db.Task
|
| - if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t); err != nil {
|
| - return err
|
| - }
|
| - rv = &t
|
| - return nil
|
| - }); err != nil {
|
| - return nil, err
|
| - }
|
| - if rv == nil {
|
| - // Return an error if id is invalid.
|
| - if _, _, err := parseId(id); err != nil {
|
| - return nil, err
|
| - }
|
| - }
|
| - return rv, nil
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error) {
|
| - min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
|
| - max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
|
| - decoder := db.TaskDecoder{}
|
| - if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
|
| - c := tasksBucket(tx).Cursor()
|
| - for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
|
| - // Only BUCKET_TASKS_VERSION = 1 is implemented right now.
|
| - _, serialized, err := unpackV1(v)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - cpy := make([]byte, len(serialized))
|
| - copy(cpy, serialized)
|
| - if !decoder.Process(cpy) {
|
| - return nil
|
| - }
|
| - }
|
| - return nil
|
| - }); err != nil {
|
| - return nil, err
|
| - }
|
| - result, err := decoder.Result()
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| - sort.Sort(db.TaskSlice(result))
|
| - // The Tasks retrieved based on Id timestamp may include Tasks with Created
|
| - // time before/after the desired range.
|
| - // TODO(benjaminwagner): Biased binary search might be faster.
|
| - startIdx := 0
|
| - for startIdx < len(result) && result[startIdx].Created.Before(start) {
|
| - startIdx++
|
| - }
|
| - endIdx := len(result)
|
| - for endIdx > 0 && !result[endIdx-1].Created.Before(end) {
|
| - endIdx--
|
| - }
|
| - return result[startIdx:endIdx], nil
|
| -}
|
| -
|
| -// See documentation for DB interface.
|
| -func (d *localDB) PutTask(t *db.Task) error {
|
| - return d.PutTasks([]*db.Task{t})
|
| -}
|
| -
|
| -// validate returns an error if the task can not be inserted into the DB. Does
|
| -// not modify t.
|
| -func (d *localDB) validate(t *db.Task) error {
|
| - if util.TimeIsZero(t.Created) {
|
| - return fmt.Errorf("Created not set. Task %s created time is %s. %v", t.Id, t.Created, t)
|
| - }
|
| - if t.Id != "" {
|
| - idTs, _, err := parseId(t.Id)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if t.Created.Sub(idTs) > MAX_CREATED_TIME_SKEW {
|
| - return fmt.Errorf("Created too late. Task %s was assigned Id at %s which is %s before Created time %s, more than MAX_CREATED_TIME_SKEW = %s.", t.Id, idTs, t.Created.Sub(idTs), t.Created, MAX_CREATED_TIME_SKEW)
|
| - }
|
| - if t.Created.Before(idTs) {
|
| - return fmt.Errorf("Created too early. Task %s Created time was changed or set to %s after Id assigned at %s.", t.Id, t.Created, idTs)
|
| - }
|
| - }
|
| - return nil
|
| -}
|
| -
|
| -// See documentation for DB interface.
|
| -func (d *localDB) PutTasks(tasks []*db.Task) error {
|
| - // If there is an error during the transaction, we should leave the tasks
|
| - // unchanged. Save the old Ids and DbModified times since we set them below.
|
| - type savedData struct {
|
| - Id string
|
| - DbModified time.Time
|
| - }
|
| - oldData := make([]savedData, 0, len(tasks))
|
| - // Validate and save current data.
|
| - for _, t := range tasks {
|
| - if err := d.validate(t); err != nil {
|
| - return err
|
| - }
|
| - oldData = append(oldData, savedData{
|
| - Id: t.Id,
|
| - DbModified: t.DbModified,
|
| - })
|
| - }
|
| - revertChanges := func() {
|
| - for i, data := range oldData {
|
| - tasks[i].Id = data.Id
|
| - tasks[i].DbModified = data.DbModified
|
| - }
|
| - }
|
| - gobs := make(map[string][]byte, len(tasks))
|
| - err := d.update("PutTasks", func(tx *bolt.Tx) error {
|
| - bucket := tasksBucket(tx)
|
| - // Assign Ids and encode.
|
| - e := db.TaskEncoder{}
|
| - now := time.Now().UTC()
|
| - for _, t := range tasks {
|
| - if t.Id == "" {
|
| - if err := d.assignId(tx, t, now); err != nil {
|
| - return err
|
| - }
|
| - } else {
|
| - if value := bucket.Get([]byte(t.Id)); value != nil {
|
| - modTs, serialized, err := unpackV1(value)
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if !modTs.Equal(t.DbModified) {
|
| - var existing db.Task
|
| - if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&existing); err != nil {
|
| - return err
|
| - }
|
| - glog.Warningf("Cached Task has been modified in the DB. Current:\n%#v\nCached:\n%#v", existing, t)
|
| - return db.ErrConcurrentUpdate
|
| - }
|
| - }
|
| - }
|
| - t.DbModified = now
|
| - e.Process(t)
|
| - }
|
| - // Insert/update.
|
| - for {
|
| - t, serialized, err := e.Next()
|
| - if err != nil {
|
| - return err
|
| - }
|
| - if t == nil {
|
| - break
|
| - }
|
| - gobs[t.Id] = serialized
|
| - // BUCKET_TASKS_VERSION = 1
|
| - value := packV1(now, serialized)
|
| - if err := bucket.Put([]byte(t.Id), value); err != nil {
|
| - return err
|
| - }
|
| - }
|
| - return nil
|
| - })
|
| - if err != nil {
|
| - revertChanges()
|
| - return err
|
| - } else {
|
| - d.modTasks.TrackModifiedTasksGOB(gobs)
|
| - }
|
| - return nil
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) {
|
| - return d.modTasks.GetModifiedTasks(id)
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) StartTrackingModifiedTasks() (string, error) {
|
| - return d.modTasks.StartTrackingModifiedTasks()
|
| -}
|
| -
|
| -// See docs for DB interface.
|
| -func (d *localDB) StopTrackingModifiedTasks(id string) {
|
| - d.modTasks.StopTrackingModifiedTasks(id)
|
| -}
|
| -
|
| -// RunBackupServer runs an HTTP server which provides downloadable database
|
| -// backups.
|
| -func (d *localDB) RunBackupServer(port string) error {
|
| - r := mux.NewRouter()
|
| - r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) {
|
| - if err := d.view("Backup", func(tx *bolt.Tx) error {
|
| - w.Header().Set("Content-Type", "application/octet-stream")
|
| - w.Header().Set("Content-Disposition", "attachment; filename=\"tasks.db\"")
|
| - w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
|
| - _, err := tx.WriteTo(w)
|
| - return err
|
| - }); err != nil {
|
| - glog.Errorf("Failed to create DB backup: %s", err)
|
| - httputils.ReportError(w, r, err, "Failed to create DB backup")
|
| - }
|
| - })
|
| - http.Handle("/", httputils.LoggingGzipRequestResponse(r))
|
| - return http.ListenAndServe(port, nil)
|
| -}
|
|
|