Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2576)

Unified Diff: build_scheduler/go/db/local_db/local_db.go

Issue 2246933002: Add Task DB implementation using a local BoltDB. (Closed) Base URL: https://skia.googlesource.com/buildbot@taskdb-impl-track
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « build_scheduler/go/db/db_test.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: build_scheduler/go/db/local_db/local_db.go
diff --git a/build_scheduler/go/db/local_db/local_db.go b/build_scheduler/go/db/local_db/local_db.go
new file mode 100644
index 0000000000000000000000000000000000000000..a47275215418212fffddda05b9a1af6623354233
--- /dev/null
+++ b/build_scheduler/go/db/local_db/local_db.go
@@ -0,0 +1,391 @@
+package local_db
+
+import (
+ "bytes"
+ "encoding/gob"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/boltdb/bolt"
+ "github.com/gorilla/mux"
+ "github.com/skia-dev/glog"
+ "go.skia.org/infra/build_scheduler/go/db"
+ "go.skia.org/infra/go/httputils"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/util"
+)
+
+const (
+ // Tasks. Key is Task.Id, which is set to (creation time, sequence number)
+ // (see formatId for detail), value is the GOB of the task. Tasks will be
+ // updated in place. All repos share the same bucket.
+ // TODO(benjaminwagner): May need to prefix value with metadata.
+ BUCKET_TASKS = "tasks"
+ // BUCKET_TASKS will be append-mostly, so use a high fill percent.
+ BUCKET_TASKS_FILL_PERCENT = 0.9
+
+ // Similar to util.RFC3339NanoZeroPad, but since Task.Id can not contain
+ // colons, we omit most of the punctuation. This timestamp can only be used to
+ // format and parse times in UTC.
+ TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
+)
+
+// formatId returns the timestamp and sequence number formatted for a Task ID.
+// Format is "<encoded_timestamp>_<encoded_sequence_num>", where:
+// - encoded_timestamp is the timestamp formatted using TIMESTAMP_FORMAT
+// - encoded_sequence_num is
+// "<lexical_ordering_char><hexidecimal_sequence_number>"
+// - lexical_ordering_char is len(hexidecimal_sequence_number)-1 encoded as a
+// hexidecimal digit, to allow lexical ordering of IDs.
+// - hexidecimal_sequence_number is the sequence number formatted in
+// hexidecimal digits with no padding.
+// (Note that since the sequence number is uint64, the maximum possible value of
+// len(hexidecimal_sequence_number)-1 is 15, so it can always be encoded as a
+// single hexidecimal digit.)
borenet 2016/08/16 19:49:54 Why not just have this function zero-pad the seque
dogben 2016/08/17 13:29:44 You're right. I thought it was overkill to have th
+func formatId(t time.Time, seq uint64) string {
+ t = t.UTC()
+ h := fmt.Sprintf("%x", seq)
+ return fmt.Sprintf("%s_%x%s", t.Format(TIMESTAMP_FORMAT), len(h)-1, h)
+}
+
+// parseId returns the timestamp and sequence number stored in a Task ID.
+func parseId(id string) (time.Time, uint64, error) {
+ parts := strings.Split(id, "_")
+ if len(parts) != 2 {
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
+ }
+ t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
+ if err != nil {
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
+ }
+ var seq uint64
+ // Skip the first character (lexicographic ordering character).
+ if len(parts[1]) < 2 {
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected at least 2 hex digits in second part.", id)
+ }
+ i, err := fmt.Sscanf(parts[1][1:], "%x", &seq)
+ if err != nil {
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
+ } else if i != 1 {
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected one hex number in %s, got %d", id, parts[1][1:], i)
+ }
+ return t, seq, nil
+}
+
+// localDB accesses a local BoltDB database containing tasks.
+type localDB struct {
+ // name is used in logging and metrics to identify this DB.
+ name string
+
+ // db is the underlying BoltDB.
+ db *bolt.DB
+
+ // tx fields contain metrics on the number of active transactions. Protected
+ // by txMutex.
+ txCount *metrics2.Counter
+ txNextId int64
+ txActive map[int64]string
+ txMutex sync.RWMutex
+
+ modTasks db.ModifiedTasks
+}
+
+// startTx monitors when a transaction starts.
+func (d *localDB) startTx(name string) int64 {
+ d.txMutex.Lock()
+ defer d.txMutex.Unlock()
+ d.txCount.Inc(1)
+ id := d.txNextId
+ d.txActive[id] = name
+ d.txNextId++
+ return id
+}
+
+// endTx monitors when a transaction ends.
+func (d *localDB) endTx(id int64) {
+ d.txMutex.Lock()
+ defer d.txMutex.Unlock()
+ d.txCount.Dec(1)
+ delete(d.txActive, id)
+}
+
+// reportActiveTx prints out the list of active transactions.
+func (d *localDB) reportActiveTx() {
+ d.txMutex.RLock()
+ defer d.txMutex.RUnlock()
+ if len(d.txActive) == 0 {
+ glog.Infof("%s Active Transactions: (none)", d.name)
+ return
+ }
+ txs := make([]string, 0, len(d.txActive))
+ for id, name := range d.txActive {
+ txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
+ }
+ glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n"))
+}
+
+// tx is a wrapper for a BoltDB transaction which tracks statistics.
+func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
+ txId := d.startTx(name)
+ defer d.endTx(txId)
+ defer metrics2.NewTimer("db-tx-duration", map[string]string{
+ "database": d.name,
+ "transaction": name,
+ }).Stop()
+ if update {
+ return d.db.Update(fn)
+ } else {
+ return d.db.View(fn)
+ }
+}
+
+// view is a wrapper for the BoltDB instance's View method.
+func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
+ return d.tx(name, fn, false)
+}
+
+// update is a wrapper for the BoltDB instance's Update method.
+func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
+ return d.tx(name, fn, true)
+}
+
+// Returns the tasks bucket with FillPercent set.
+func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
+ b := tx.Bucket([]byte(BUCKET_TASKS))
+ b.FillPercent = BUCKET_TASKS_FILL_PERCENT
+ return b
+}
+
+// NewDB returns a local DB instance.
+func NewDB(name, filename string) (db.DB, error) {
+ boltdb, err := bolt.Open(filename, 0600, nil)
+ if err != nil {
+ return nil, err
+ }
+ d := &localDB{
+ name: name,
+ db: boltdb,
+ txCount: metrics2.GetCounter("db-active-tx", map[string]string{
+ "database": name,
+ }),
+ txNextId: 0,
+ txActive: map[int64]string{},
+ }
+ go func() {
+ for _ = range time.Tick(time.Minute) {
+ d.reportActiveTx()
+ }
+ }()
+
+ if err := d.update("NewDB", func(tx *bolt.Tx) error {
+ if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); err != nil {
+ return err
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ return d, nil
+}
+
+// See docs for DB interface.
+func (d *localDB) Close() error {
+ d.txMutex.Lock()
+ defer d.txMutex.Unlock()
+ d.txActive = map[int64]string{}
borenet 2016/08/16 19:49:54 nit: Should we also set the counter to zero here?
dogben 2016/08/17 13:29:44 I added a check that there are no active transacti
+
+ return d.db.Close()
+}
+
+// Sets t.Id either based on t.Created or now. tx must be an update transaction.
+func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error {
+ if t.Id != "" {
+ return fmt.Errorf("Task Id already assigned: %v", t.Id)
+ }
+ ts := now
+ if !util.TimeIsZero(t.Created) {
+ ts = t.Created
+ }
+ seq, err := tasksBucket(tx).NextSequence()
+ if err != nil {
+ return err
+ }
+ t.Id = formatId(ts, seq)
+ return nil
+}
+
+// See docs for DB interface.
+func (d *localDB) AssignId(t *db.Task) error {
+ oldId := t.Id
+ err := d.update("AssignId", func(tx *bolt.Tx) error {
+ return d.assignId(tx, t, time.Now())
+ })
+ if err != nil {
+ t.Id = oldId
+ }
+ return err
+}
+
+// See docs for DB interface.
+func (d *localDB) GetTaskById(id string) (*db.Task, error) {
+ var rv *db.Task
+ if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
+ serialized := tasksBucket(tx).Get([]byte(id))
+ if serialized == nil {
+ return nil
+ }
+ var t db.Task
+ if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t); err != nil {
+ return err
+ }
+ rv = &t
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ if rv == nil {
+ // Return an error if id is invalid.
+ if _, _, err := parseId(id); err != nil {
+ return nil, err
+ }
+ }
+ return rv, nil
+}
+
+// See docs for DB interface.
+// TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id.
+func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error) {
+ min := []byte(start.UTC().Format(TIMESTAMP_FORMAT))
+ max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
+ decoder := db.TaskDecoder{}
+ if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
+ c := tasksBucket(tx).Cursor()
+ for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
+ cpy := make([]byte, len(v))
+ copy(cpy, v)
+ if !decoder.Process(cpy) {
+ return nil
+ }
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ return decoder.Result()
+}
+
+// See documentation for DB interface.
+func (d *localDB) PutTask(t *db.Task) error {
+ return d.PutTasks([]*db.Task{t})
+}
+
+// validate returns an error if the task can not be inserted into the DB. Does
+// not modify t.
+func (d *localDB) validate(t *db.Task) error {
+ // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Created.
+ return nil
+}
+
+// See documentation for DB interface.
+// TODO(benjaminwagner): Figure out how to detect write/write conflicts and
+// return "concurrent modification" error.
+func (d *localDB) PutTasks(tasks []*db.Task) error {
+ // If there is an error during the transaction, we should leave the tasks
+ // unchanged. Save the old Ids since we set them below.
+ oldIds := make([]string, len(tasks))
+ // Validate and save current Ids.
+ for _, t := range tasks {
+ if err := d.validate(t); err != nil {
+ return err
+ }
+ oldIds = append(oldIds, t.Id)
+ }
+ revertChanges := func() {
+ for i, oldId := range oldIds {
+ tasks[i].Id = oldId
+ }
+ }
+ err := d.update("PutTasks", func(tx *bolt.Tx) error {
+ // Assign Ids and encode.
+ e := db.TaskEncoder{}
+ now := time.Now()
+ for _, t := range tasks {
+ if t.Id == "" {
+ if err := d.assignId(tx, t, now); err != nil {
+ return err
+ }
+ }
+ e.Process(t)
+ }
+ // Insert/update.
+ for {
+ t, serialized, err := e.Next()
+ if err != nil {
+ return err
+ }
+ if t == nil {
+ break
+ }
+ if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ revertChanges()
+ return err
+ } else {
+ // TODO(benjaminwagner): pass serialized bytes.
+ d.modTasks.TrackModifiedTasks(tasks)
+ }
+ return nil
+}
+
+// See docs for DB interface.
+func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) {
+ return d.modTasks.GetModifiedTasks(id)
+}
+
+// See docs for DB interface.
+func (d *localDB) StartTrackingModifiedTasks() (string, error) {
+ return d.modTasks.StartTrackingModifiedTasks()
+}
+
+// Returns the total number of tasks in the DB.
+// TODO(benjaminwagner): add a metrics goroutine.
+func (d *localDB) NumTasks() (int, error) {
+ var n int
+ if err := d.view("NumTasks", func(tx *bolt.Tx) error {
+ n = tasksBucket(tx).Stats().KeyN
+ return nil
+ }); err != nil {
+ return -1, err
+ }
+ return n, nil
+}
+
+// RunBackupServer runs an HTTP server which provides downloadable database
+// backups.
+func (d *localDB) RunBackupServer(port string) error {
+ r := mux.NewRouter()
+ r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) {
+ if err := d.view("Backup", func(tx *bolt.Tx) error {
+ w.Header().Set("Content-Type", "application/octet-stream")
+ w.Header().Set("Content-Disposition", "attachment; filename=\"tasks.db\"")
+ w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
+ _, err := tx.WriteTo(w)
+ return err
+ }); err != nil {
+ glog.Errorf("Failed to create DB backup: %s", err)
+ httputils.ReportError(w, r, err, "Failed to create DB backup")
+ }
+ })
+ http.Handle("/", httputils.LoggingGzipRequestResponse(r))
+ return http.ListenAndServe(port, nil)
+}
« no previous file with comments | « build_scheduler/go/db/db_test.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698