Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1371)

Unified Diff: build_scheduler/go/db/local_db/local_db.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « build_scheduler/go/db/local_db/busywork/main.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: build_scheduler/go/db/local_db/local_db.go
diff --git a/build_scheduler/go/db/local_db/local_db.go b/build_scheduler/go/db/local_db/local_db.go
deleted file mode 100644
index 97e351a86c43c31925fb4bc79c70020e71f36195..0000000000000000000000000000000000000000
--- a/build_scheduler/go/db/local_db/local_db.go
+++ /dev/null
@@ -1,529 +0,0 @@
-package local_db
-
-import (
- "bytes"
- "encoding/binary"
- "encoding/gob"
- "fmt"
- "net/http"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "github.com/boltdb/bolt"
- "github.com/gorilla/mux"
- "github.com/skia-dev/glog"
- "go.skia.org/infra/build_scheduler/go/db"
- "go.skia.org/infra/go/boltutil"
- "go.skia.org/infra/go/httputils"
- "go.skia.org/infra/go/metrics2"
- "go.skia.org/infra/go/util"
-)
-
-const (
- // BUCKET_TASKS is the name of the Tasks bucket. Key is Task.Id, which is set
- // to (creation time, sequence number) (see formatId for detail), value is
- // described in docs for BUCKET_TASKS_VERSION. Tasks will be updated in place.
- // All repos share the same bucket.
- BUCKET_TASKS = "tasks"
- // BUCKET_TASKS_FILL_PERCENT is the value to set for bolt.Bucket.FillPercent
- // for BUCKET_TASKS. BUCKET_TASKS will be append-mostly, so use a high fill
- // percent.
- BUCKET_TASKS_FILL_PERCENT = 0.9
- // BUCKET_TASKS_VERSION indicates the format of the value of BUCKET_TASKS
- // written by PutTasks. Retrieving Tasks from the DB must support all previous
- // versions. For all versions, the first byte is the version number.
- // Version 1: v[0] = 1; v[1:9] is the modified time as UnixNano encoded as
- // big endian; v[9:] is the GOB of the Task.
- BUCKET_TASKS_VERSION = 1
-
- // TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Parse to
- // format/parse the timestamp in the Task ID. It is similar to
- // util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we omit
- // most of the punctuation. This timestamp can only be used to format and
- // parse times in UTC.
- TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
- // SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or
- // fmt.Sscanf to format/parse the sequence number in the Task ID. It is a
- // 16-digit zero-padded lowercase hexidecimal number.
- SEQUENCE_NUMBER_FORMAT = "%016x"
-
- // MAX_CREATED_TIME_SKEW is the maximum difference between the timestamp in a
- // Task's Id field and that Task's Created field. This allows AssignId to be
- // called before creating the Swarming task so that the Id can be included in
- // the Swarming task tags. GetTasksFromDateRange accounts for this skew when
- // retrieving tasks. This value can be increased in the future, but can never
- // be decreased.
- //
- // 6 minutes is based on httputils.DIAL_TIMEOUT + httputils.REQUEST_TIMEOUT,
- // which is assumed to be the approximate maximum duration of a successful
- // swarming.ApiClient.TriggerTask() call.
- MAX_CREATED_TIME_SKEW = 6 * time.Minute
-)
-
-// formatId returns the timestamp and sequence number formatted for a Task ID.
-// Format is "<timestamp>_<sequence_num>", where the timestamp is formatted
-// using TIMESTAMP_FORMAT and sequence_num is formatted using
-// SEQUENCE_NUMBER_FORMAT.
-func formatId(t time.Time, seq uint64) string {
- t = t.UTC()
- return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORMAT), seq)
-}
-
-// parseId returns the timestamp and sequence number stored in a Task ID.
-func parseId(id string) (time.Time, uint64, error) {
- parts := strings.Split(id, "_")
- if len(parts) != 2 {
- return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
- }
- t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
- if err != nil {
- return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
- }
- var seq uint64
- // Add newlines to force Sscanf to match the entire string. Otherwise
- // "123hello" will be parsed as 123. Note that Sscanf does not require 16
- // digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 digits.
- i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq)
- if err != nil {
- return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
- } else if i != 1 {
- return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected one hex number in %s, got %d", id, parts[1], i)
- }
- return t, seq, nil
-}
-
-// packV1 creates a value as described for BUCKET_TASKS_VERSION = 1. t is the
-// modified time and serialized is the GOB of the Task.
-func packV1(t time.Time, serialized []byte) []byte {
- rv := make([]byte, len(serialized)+9)
- rv[0] = 1
- binary.BigEndian.PutUint64(rv[1:9], uint64(t.UnixNano()))
- copy(rv[9:], serialized)
- return rv
-}
-
-// unpackV1 gets the modified time and GOB of the Task from a value as described
-// by BUCKET_TASKS_VERSION = 1. The returned GOB shares structure with value.
-func unpackV1(value []byte) (time.Time, []byte, error) {
- if len(value) < 9 {
- return time.Time{}, nil, fmt.Errorf("unpackV1 value is too short (%d bytes)", len(value))
- }
- if value[0] != 1 {
- return time.Time{}, nil, fmt.Errorf("unpackV1 called for value with version %d", value[0])
- }
- t := time.Unix(0, int64(binary.BigEndian.Uint64(value[1:9]))).UTC()
- return t, value[9:], nil
-}
-
-// localDB accesses a local BoltDB database containing tasks.
-type localDB struct {
- // name is used in logging and metrics to identify this DB.
- name string
-
- // db is the underlying BoltDB.
- db *bolt.DB
-
- // tx fields contain metrics on the number of active transactions. Protected
- // by txMutex.
- txCount *metrics2.Counter
- txNextId int64
- txActive map[int64]string
- txMutex sync.RWMutex
-
- dbMetric *boltutil.DbMetric
-
- modTasks db.ModifiedTasks
-
- // Close will send on each of these channels to indicate goroutines should
- // stop.
- notifyOnClose []chan bool
-}
-
-// startTx monitors when a transaction starts.
-func (d *localDB) startTx(name string) int64 {
- d.txMutex.Lock()
- defer d.txMutex.Unlock()
- d.txCount.Inc(1)
- id := d.txNextId
- d.txActive[id] = name
- d.txNextId++
- return id
-}
-
-// endTx monitors when a transaction ends.
-func (d *localDB) endTx(id int64) {
- d.txMutex.Lock()
- defer d.txMutex.Unlock()
- d.txCount.Dec(1)
- delete(d.txActive, id)
-}
-
-// reportActiveTx prints out the list of active transactions.
-func (d *localDB) reportActiveTx() {
- d.txMutex.RLock()
- defer d.txMutex.RUnlock()
- if len(d.txActive) == 0 {
- glog.Infof("%s Active Transactions: (none)", d.name)
- return
- }
- txs := make([]string, 0, len(d.txActive))
- for id, name := range d.txActive {
- txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
- }
- glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n"))
-}
-
-// tx is a wrapper for a BoltDB transaction which tracks statistics.
-func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
- txId := d.startTx(name)
- defer d.endTx(txId)
- defer metrics2.NewTimer("db-tx-duration", map[string]string{
- "database": d.name,
- "transaction": name,
- }).Stop()
- if update {
- return d.db.Update(fn)
- } else {
- return d.db.View(fn)
- }
-}
-
-// view is a wrapper for the BoltDB instance's View method.
-func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
- return d.tx(name, fn, false)
-}
-
-// update is a wrapper for the BoltDB instance's Update method.
-func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
- return d.tx(name, fn, true)
-}
-
-// Returns the tasks bucket with FillPercent set.
-func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
- b := tx.Bucket([]byte(BUCKET_TASKS))
- b.FillPercent = BUCKET_TASKS_FILL_PERCENT
- return b
-}
-
-// NewDB returns a local DB instance.
-func NewDB(name, filename string) (db.DB, error) {
- boltdb, err := bolt.Open(filename, 0600, nil)
- if err != nil {
- return nil, err
- }
- d := &localDB{
- name: name,
- db: boltdb,
- txCount: metrics2.GetCounter("db-active-tx", map[string]string{
- "database": name,
- }),
- txNextId: 0,
- txActive: map[int64]string{},
- }
-
- stopReportActiveTx := make(chan bool)
- d.notifyOnClose = append(d.notifyOnClose, stopReportActiveTx)
- go func() {
- t := time.NewTicker(time.Minute)
- for {
- select {
- case <-stopReportActiveTx:
- t.Stop()
- return
- case <-t.C:
- d.reportActiveTx()
- }
- }
- }()
-
- if err := d.update("NewDB", func(tx *bolt.Tx) error {
- if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); err != nil {
- return err
- }
- return nil
- }); err != nil {
- return nil, err
- }
-
- if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS}, map[string]string{"database": name}); err != nil {
- return nil, err
- } else {
- d.dbMetric = dbMetric
- }
-
- return d, nil
-}
-
-// See docs for DB interface.
-func (d *localDB) Close() error {
- d.txMutex.Lock()
- defer d.txMutex.Unlock()
- if len(d.txActive) > 0 {
- return fmt.Errorf("Can not close DB when transactions are active.")
- }
- for _, c := range d.notifyOnClose {
- c <- true
- }
- d.txActive = map[int64]string{}
- if err := d.dbMetric.Delete(); err != nil {
- return err
- }
- d.dbMetric = nil
- if err := d.txCount.Delete(); err != nil {
- return err
- }
- d.txCount = nil
- return d.db.Close()
-}
-
-// Sets t.Id either based on t.Created or now. tx must be an update transaction.
-func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error {
- if t.Id != "" {
- return fmt.Errorf("Task Id already assigned: %v", t.Id)
- }
- ts := now
- if !util.TimeIsZero(t.Created) {
- // TODO(benjaminwagner): Disallow assigning IDs based on t.Created; or
- // ensure t.Created is > any ID ts in the DB.
- ts = t.Created
- }
- seq, err := tasksBucket(tx).NextSequence()
- if err != nil {
- return err
- }
- t.Id = formatId(ts, seq)
- return nil
-}
-
-// See docs for DB interface.
-func (d *localDB) AssignId(t *db.Task) error {
- oldId := t.Id
- err := d.update("AssignId", func(tx *bolt.Tx) error {
- return d.assignId(tx, t, time.Now())
- })
- if err != nil {
- t.Id = oldId
- }
- return err
-}
-
-// See docs for DB interface.
-func (d *localDB) GetTaskById(id string) (*db.Task, error) {
- var rv *db.Task
- if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
- value := tasksBucket(tx).Get([]byte(id))
- if value == nil {
- return nil
- }
- // Only BUCKET_TASKS_VERSION = 1 is implemented right now.
- // TODO(benjaminwagner): Add functions "pack" and "unpack" that determine
- // which version to use.
- _, serialized, err := unpackV1(value)
- if err != nil {
- return err
- }
- var t db.Task
- if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t); err != nil {
- return err
- }
- rv = &t
- return nil
- }); err != nil {
- return nil, err
- }
- if rv == nil {
- // Return an error if id is invalid.
- if _, _, err := parseId(id); err != nil {
- return nil, err
- }
- }
- return rv, nil
-}
-
-// See docs for DB interface.
-func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error) {
- min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
- max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
- decoder := db.TaskDecoder{}
- if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
- c := tasksBucket(tx).Cursor()
- for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
- // Only BUCKET_TASKS_VERSION = 1 is implemented right now.
- _, serialized, err := unpackV1(v)
- if err != nil {
- return err
- }
- cpy := make([]byte, len(serialized))
- copy(cpy, serialized)
- if !decoder.Process(cpy) {
- return nil
- }
- }
- return nil
- }); err != nil {
- return nil, err
- }
- result, err := decoder.Result()
- if err != nil {
- return nil, err
- }
- sort.Sort(db.TaskSlice(result))
- // The Tasks retrieved based on Id timestamp may include Tasks with Created
- // time before/after the desired range.
- // TODO(benjaminwagner): Biased binary search might be faster.
- startIdx := 0
- for startIdx < len(result) && result[startIdx].Created.Before(start) {
- startIdx++
- }
- endIdx := len(result)
- for endIdx > 0 && !result[endIdx-1].Created.Before(end) {
- endIdx--
- }
- return result[startIdx:endIdx], nil
-}
-
-// See documentation for DB interface.
-func (d *localDB) PutTask(t *db.Task) error {
- return d.PutTasks([]*db.Task{t})
-}
-
-// validate returns an error if the task can not be inserted into the DB. Does
-// not modify t.
-func (d *localDB) validate(t *db.Task) error {
- if util.TimeIsZero(t.Created) {
- return fmt.Errorf("Created not set. Task %s created time is %s. %v", t.Id, t.Created, t)
- }
- if t.Id != "" {
- idTs, _, err := parseId(t.Id)
- if err != nil {
- return err
- }
- if t.Created.Sub(idTs) > MAX_CREATED_TIME_SKEW {
- return fmt.Errorf("Created too late. Task %s was assigned Id at %s which is %s before Created time %s, more than MAX_CREATED_TIME_SKEW = %s.", t.Id, idTs, t.Created.Sub(idTs), t.Created, MAX_CREATED_TIME_SKEW)
- }
- if t.Created.Before(idTs) {
- return fmt.Errorf("Created too early. Task %s Created time was changed or set to %s after Id assigned at %s.", t.Id, t.Created, idTs)
- }
- }
- return nil
-}
-
-// See documentation for DB interface.
-func (d *localDB) PutTasks(tasks []*db.Task) error {
- // If there is an error during the transaction, we should leave the tasks
- // unchanged. Save the old Ids and DbModified times since we set them below.
- type savedData struct {
- Id string
- DbModified time.Time
- }
- oldData := make([]savedData, 0, len(tasks))
- // Validate and save current data.
- for _, t := range tasks {
- if err := d.validate(t); err != nil {
- return err
- }
- oldData = append(oldData, savedData{
- Id: t.Id,
- DbModified: t.DbModified,
- })
- }
- revertChanges := func() {
- for i, data := range oldData {
- tasks[i].Id = data.Id
- tasks[i].DbModified = data.DbModified
- }
- }
- gobs := make(map[string][]byte, len(tasks))
- err := d.update("PutTasks", func(tx *bolt.Tx) error {
- bucket := tasksBucket(tx)
- // Assign Ids and encode.
- e := db.TaskEncoder{}
- now := time.Now().UTC()
- for _, t := range tasks {
- if t.Id == "" {
- if err := d.assignId(tx, t, now); err != nil {
- return err
- }
- } else {
- if value := bucket.Get([]byte(t.Id)); value != nil {
- modTs, serialized, err := unpackV1(value)
- if err != nil {
- return err
- }
- if !modTs.Equal(t.DbModified) {
- var existing db.Task
- if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&existing); err != nil {
- return err
- }
- glog.Warningf("Cached Task has been modified in the DB. Current:\n%#v\nCached:\n%#v", existing, t)
- return db.ErrConcurrentUpdate
- }
- }
- }
- t.DbModified = now
- e.Process(t)
- }
- // Insert/update.
- for {
- t, serialized, err := e.Next()
- if err != nil {
- return err
- }
- if t == nil {
- break
- }
- gobs[t.Id] = serialized
- // BUCKET_TASKS_VERSION = 1
- value := packV1(now, serialized)
- if err := bucket.Put([]byte(t.Id), value); err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- revertChanges()
- return err
- } else {
- d.modTasks.TrackModifiedTasksGOB(gobs)
- }
- return nil
-}
-
-// See docs for DB interface.
-func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) {
- return d.modTasks.GetModifiedTasks(id)
-}
-
-// See docs for DB interface.
-func (d *localDB) StartTrackingModifiedTasks() (string, error) {
- return d.modTasks.StartTrackingModifiedTasks()
-}
-
-// See docs for DB interface.
-func (d *localDB) StopTrackingModifiedTasks(id string) {
- d.modTasks.StopTrackingModifiedTasks(id)
-}
-
-// RunBackupServer runs an HTTP server which provides downloadable database
-// backups.
-func (d *localDB) RunBackupServer(port string) error {
- r := mux.NewRouter()
- r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) {
- if err := d.view("Backup", func(tx *bolt.Tx) error {
- w.Header().Set("Content-Type", "application/octet-stream")
- w.Header().Set("Content-Disposition", "attachment; filename=\"tasks.db\"")
- w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
- _, err := tx.WriteTo(w)
- return err
- }); err != nil {
- glog.Errorf("Failed to create DB backup: %s", err)
- httputils.ReportError(w, r, err, "Failed to create DB backup")
- }
- })
- http.Handle("/", httputils.LoggingGzipRequestResponse(r))
- return http.ListenAndServe(port, nil)
-}
« no previous file with comments | « build_scheduler/go/db/local_db/busywork/main.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698