Index: build_scheduler/go/db/local_db/local_db.go |
diff --git a/build_scheduler/go/db/local_db/local_db.go b/build_scheduler/go/db/local_db/local_db.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a47275215418212fffddda05b9a1af6623354233 |
--- /dev/null |
+++ b/build_scheduler/go/db/local_db/local_db.go |
@@ -0,0 +1,391 @@ |
+package local_db |
+ |
+import ( |
+ "bytes" |
+ "encoding/gob" |
+ "fmt" |
+ "net/http" |
+ "strconv" |
+ "strings" |
+ "sync" |
+ "time" |
+ |
+ "github.com/boltdb/bolt" |
+ "github.com/gorilla/mux" |
+ "github.com/skia-dev/glog" |
+ "go.skia.org/infra/build_scheduler/go/db" |
+ "go.skia.org/infra/go/httputils" |
+ "go.skia.org/infra/go/metrics2" |
+ "go.skia.org/infra/go/util" |
+) |
+ |
+const ( |
+ // Tasks. Key is Task.Id, which is set to (creation time, sequence number) |
+ // (see formatId for detail), value is the GOB of the task. Tasks will be |
+ // updated in place. All repos share the same bucket. |
+ // TODO(benjaminwagner): May need to prefix value with metadata. |
+ BUCKET_TASKS = "tasks" |
+ // BUCKET_TASKS will be append-mostly, so use a high fill percent. |
+ BUCKET_TASKS_FILL_PERCENT = 0.9 |
+ |
+ // Similar to util.RFC3339NanoZeroPad, but since Task.Id can not contain |
+ // colons, we omit most of the punctuation. This timestamp can only be used to |
+ // format and parse times in UTC. |
+ TIMESTAMP_FORMAT = "20060102T150405.000000000Z" |
+) |
+ |
+// formatId returns the timestamp and sequence number formatted for a Task ID. |
+// Format is "<encoded_timestamp>_<encoded_sequence_num>", where: |
+// - encoded_timestamp is the timestamp formatted using TIMESTAMP_FORMAT |
+// - encoded_sequence_num is |
+// "<lexical_ordering_char><hexidecimal_sequence_number>" |
+// - lexical_ordering_char is len(hexidecimal_sequence_number)-1 encoded as a |
+// hexidecimal digit, to allow lexical ordering of IDs. |
+// - hexidecimal_sequence_number is the sequence number formatted in |
+// hexidecimal digits with no padding. |
+// (Note that since the sequence number is uint64, the maximum possible value of |
+// len(hexidecimal_sequence_number)-1 is 15, so it can always be encoded as a |
+// single hexidecimal digit.) |
borenet
2016/08/16 19:49:54
Why not just have this function zero-pad the seque
dogben
2016/08/17 13:29:44
You're right. I thought it was overkill to have th
|
+func formatId(t time.Time, seq uint64) string { |
+ t = t.UTC() |
+ h := fmt.Sprintf("%x", seq) |
+ return fmt.Sprintf("%s_%x%s", t.Format(TIMESTAMP_FORMAT), len(h)-1, h) |
+} |
+ |
+// parseId returns the timestamp and sequence number stored in a Task ID. |
+func parseId(id string) (time.Time, uint64, error) { |
+ parts := strings.Split(id, "_") |
+ if len(parts) != 2 { |
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id) |
+ } |
+ t, err := time.Parse(TIMESTAMP_FORMAT, parts[0]) |
+ if err != nil { |
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err) |
+ } |
+ var seq uint64 |
+ // Skip the first character (lexicographic ordering character). |
+ if len(parts[1]) < 2 { |
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected at least 2 hex digits in second part.", id) |
+ } |
+ i, err := fmt.Sscanf(parts[1][1:], "%x", &seq) |
+ if err != nil { |
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err) |
+ } else if i != 1 { |
+ return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected one hex number in %s, got %d", id, parts[1][1:], i) |
+ } |
+ return t, seq, nil |
+} |
+ |
+// localDB accesses a local BoltDB database containing tasks. |
+type localDB struct { |
+ // name is used in logging and metrics to identify this DB. |
+ name string |
+ |
+ // db is the underlying BoltDB. |
+ db *bolt.DB |
+ |
+ // tx fields contain metrics on the number of active transactions. Protected |
+ // by txMutex. |
+ txCount *metrics2.Counter |
+ txNextId int64 |
+ txActive map[int64]string |
+ txMutex sync.RWMutex |
+ |
+ modTasks db.ModifiedTasks |
+} |
+ |
+// startTx monitors when a transaction starts. |
+func (d *localDB) startTx(name string) int64 { |
+ d.txMutex.Lock() |
+ defer d.txMutex.Unlock() |
+ d.txCount.Inc(1) |
+ id := d.txNextId |
+ d.txActive[id] = name |
+ d.txNextId++ |
+ return id |
+} |
+ |
+// endTx monitors when a transaction ends. |
+func (d *localDB) endTx(id int64) { |
+ d.txMutex.Lock() |
+ defer d.txMutex.Unlock() |
+ d.txCount.Dec(1) |
+ delete(d.txActive, id) |
+} |
+ |
+// reportActiveTx prints out the list of active transactions. |
+func (d *localDB) reportActiveTx() { |
+ d.txMutex.RLock() |
+ defer d.txMutex.RUnlock() |
+ if len(d.txActive) == 0 { |
+ glog.Infof("%s Active Transactions: (none)", d.name) |
+ return |
+ } |
+ txs := make([]string, 0, len(d.txActive)) |
+ for id, name := range d.txActive { |
+ txs = append(txs, fmt.Sprintf(" %d\t%s", id, name)) |
+ } |
+ glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n")) |
+} |
+ |
+// tx is a wrapper for a BoltDB transaction which tracks statistics. |
+func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error { |
+ txId := d.startTx(name) |
+ defer d.endTx(txId) |
+ defer metrics2.NewTimer("db-tx-duration", map[string]string{ |
+ "database": d.name, |
+ "transaction": name, |
+ }).Stop() |
+ if update { |
+ return d.db.Update(fn) |
+ } else { |
+ return d.db.View(fn) |
+ } |
+} |
+ |
+// view is a wrapper for the BoltDB instance's View method. |
+func (d *localDB) view(name string, fn func(*bolt.Tx) error) error { |
+ return d.tx(name, fn, false) |
+} |
+ |
+// update is a wrapper for the BoltDB instance's Update method. |
+func (d *localDB) update(name string, fn func(*bolt.Tx) error) error { |
+ return d.tx(name, fn, true) |
+} |
+ |
+// Returns the tasks bucket with FillPercent set. |
+func tasksBucket(tx *bolt.Tx) *bolt.Bucket { |
+ b := tx.Bucket([]byte(BUCKET_TASKS)) |
+ b.FillPercent = BUCKET_TASKS_FILL_PERCENT |
+ return b |
+} |
+ |
+// NewDB returns a local DB instance. |
+func NewDB(name, filename string) (db.DB, error) { |
+ boltdb, err := bolt.Open(filename, 0600, nil) |
+ if err != nil { |
+ return nil, err |
+ } |
+ d := &localDB{ |
+ name: name, |
+ db: boltdb, |
+ txCount: metrics2.GetCounter("db-active-tx", map[string]string{ |
+ "database": name, |
+ }), |
+ txNextId: 0, |
+ txActive: map[int64]string{}, |
+ } |
+ go func() { |
+ for _ = range time.Tick(time.Minute) { |
+ d.reportActiveTx() |
+ } |
+ }() |
+ |
+ if err := d.update("NewDB", func(tx *bolt.Tx) error { |
+ if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); err != nil { |
+ return err |
+ } |
+ return nil |
+ }); err != nil { |
+ return nil, err |
+ } |
+ |
+ return d, nil |
+} |
+ |
+// See docs for DB interface. |
+func (d *localDB) Close() error { |
+ d.txMutex.Lock() |
+ defer d.txMutex.Unlock() |
+ d.txActive = map[int64]string{} |
borenet
2016/08/16 19:49:54
nit: Should we also set the counter to zero here?
dogben
2016/08/17 13:29:44
I added a check that there are no active transacti
|
+ |
+ return d.db.Close() |
+} |
+ |
+// Sets t.Id either based on t.Created or now. tx must be an update transaction. |
+func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error { |
+ if t.Id != "" { |
+ return fmt.Errorf("Task Id already assigned: %v", t.Id) |
+ } |
+ ts := now |
+ if !util.TimeIsZero(t.Created) { |
+ ts = t.Created |
+ } |
+ seq, err := tasksBucket(tx).NextSequence() |
+ if err != nil { |
+ return err |
+ } |
+ t.Id = formatId(ts, seq) |
+ return nil |
+} |
+ |
+// See docs for DB interface. |
+func (d *localDB) AssignId(t *db.Task) error { |
+ oldId := t.Id |
+ err := d.update("AssignId", func(tx *bolt.Tx) error { |
+ return d.assignId(tx, t, time.Now()) |
+ }) |
+ if err != nil { |
+ t.Id = oldId |
+ } |
+ return err |
+} |
+ |
+// See docs for DB interface. |
+func (d *localDB) GetTaskById(id string) (*db.Task, error) { |
+ var rv *db.Task |
+ if err := d.view("GetTaskById", func(tx *bolt.Tx) error { |
+ serialized := tasksBucket(tx).Get([]byte(id)) |
+ if serialized == nil { |
+ return nil |
+ } |
+ var t db.Task |
+ if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t); err != nil { |
+ return err |
+ } |
+ rv = &t |
+ return nil |
+ }); err != nil { |
+ return nil, err |
+ } |
+ if rv == nil { |
+ // Return an error if id is invalid. |
+ if _, _, err := parseId(id); err != nil { |
+ return nil, err |
+ } |
+ } |
+ return rv, nil |
+} |
+ |
+// See docs for DB interface. |
+// TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id. |
+func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error) { |
+ min := []byte(start.UTC().Format(TIMESTAMP_FORMAT)) |
+ max := []byte(end.UTC().Format(TIMESTAMP_FORMAT)) |
+ decoder := db.TaskDecoder{} |
+ if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error { |
+ c := tasksBucket(tx).Cursor() |
+ for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { |
+ cpy := make([]byte, len(v)) |
+ copy(cpy, v) |
+ if !decoder.Process(cpy) { |
+ return nil |
+ } |
+ } |
+ return nil |
+ }); err != nil { |
+ return nil, err |
+ } |
+ return decoder.Result() |
+} |
+ |
+// See documentation for DB interface. |
+func (d *localDB) PutTask(t *db.Task) error { |
+ return d.PutTasks([]*db.Task{t}) |
+} |
+ |
+// validate returns an error if the task can not be inserted into the DB. Does |
+// not modify t. |
+func (d *localDB) validate(t *db.Task) error { |
+ // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Created. |
+ return nil |
+} |
+ |
+// See documentation for DB interface. |
+// TODO(benjaminwagner): Figure out how to detect write/write conflicts and |
+// return "concurrent modification" error. |
+func (d *localDB) PutTasks(tasks []*db.Task) error { |
+ // If there is an error during the transaction, we should leave the tasks |
+ // unchanged. Save the old Ids since we set them below. |
+ oldIds := make([]string, len(tasks)) |
+ // Validate and save current Ids. |
+ for _, t := range tasks { |
+ if err := d.validate(t); err != nil { |
+ return err |
+ } |
+ oldIds = append(oldIds, t.Id) |
+ } |
+ revertChanges := func() { |
+ for i, oldId := range oldIds { |
+ tasks[i].Id = oldId |
+ } |
+ } |
+ err := d.update("PutTasks", func(tx *bolt.Tx) error { |
+ // Assign Ids and encode. |
+ e := db.TaskEncoder{} |
+ now := time.Now() |
+ for _, t := range tasks { |
+ if t.Id == "" { |
+ if err := d.assignId(tx, t, now); err != nil { |
+ return err |
+ } |
+ } |
+ e.Process(t) |
+ } |
+ // Insert/update. |
+ for { |
+ t, serialized, err := e.Next() |
+ if err != nil { |
+ return err |
+ } |
+ if t == nil { |
+ break |
+ } |
+ if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil { |
+ return err |
+ } |
+ } |
+ return nil |
+ }) |
+ if err != nil { |
+ revertChanges() |
+ return err |
+ } else { |
+ // TODO(benjaminwagner): pass serialized bytes. |
+ d.modTasks.TrackModifiedTasks(tasks) |
+ } |
+ return nil |
+} |
+ |
+// See docs for DB interface. |
+func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) { |
+ return d.modTasks.GetModifiedTasks(id) |
+} |
+ |
+// See docs for DB interface. |
+func (d *localDB) StartTrackingModifiedTasks() (string, error) { |
+ return d.modTasks.StartTrackingModifiedTasks() |
+} |
+ |
+// Returns the total number of tasks in the DB. |
+// TODO(benjaminwagner): add a metrics goroutine. |
+func (d *localDB) NumTasks() (int, error) { |
+ var n int |
+ if err := d.view("NumTasks", func(tx *bolt.Tx) error { |
+ n = tasksBucket(tx).Stats().KeyN |
+ return nil |
+ }); err != nil { |
+ return -1, err |
+ } |
+ return n, nil |
+} |
+ |
+// RunBackupServer runs an HTTP server which provides downloadable database |
+// backups. |
+func (d *localDB) RunBackupServer(port string) error { |
+ r := mux.NewRouter() |
+ r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) { |
+ if err := d.view("Backup", func(tx *bolt.Tx) error { |
+ w.Header().Set("Content-Type", "application/octet-stream") |
+ w.Header().Set("Content-Disposition", "attachment; filename=\"tasks.db\"") |
+ w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size()))) |
+ _, err := tx.WriteTo(w) |
+ return err |
+ }); err != nil { |
+ glog.Errorf("Failed to create DB backup: %s", err) |
+ httputils.ReportError(w, r, err, "Failed to create DB backup") |
+ } |
+ }) |
+ http.Handle("/", httputils.LoggingGzipRequestResponse(r)) |
+ return http.ListenAndServe(port, nil) |
+} |