Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 package local_db | |
| 2 | |
| 3 import ( | |
| 4 "bytes" | |
| 5 "encoding/gob" | |
| 6 "fmt" | |
| 7 "net/http" | |
| 8 "strconv" | |
| 9 "strings" | |
| 10 "sync" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/boltdb/bolt" | |
| 14 "github.com/gorilla/mux" | |
| 15 "github.com/skia-dev/glog" | |
| 16 "go.skia.org/infra/build_scheduler/go/db" | |
| 17 "go.skia.org/infra/go/httputils" | |
| 18 "go.skia.org/infra/go/metrics2" | |
| 19 "go.skia.org/infra/go/util" | |
| 20 ) | |
| 21 | |
| 22 const ( | |
| 23 // Tasks. Key is Task.Id, which is set to (creation time, sequence numbe r) | |
| 24 // (see formatId for detail), value is the GOB of the task. Tasks will b e | |
| 25 // updated in place. All repos share the same bucket. | |
| 26 // TODO(benjaminwagner): May need to prefix value with metadata. | |
| 27 BUCKET_TASKS = "tasks" | |
| 28 // BUCKET_TASKS will be append-mostly, so use a high fill percent. | |
| 29 BUCKET_TASKS_FILL_PERCENT = 0.9 | |
| 30 | |
| 31 // Similar to util.RFC3339NanoZeroPad, but since Task.Id can not contain | |
| 32 // colons, we omit most of the punctuation. This timestamp can only be u sed to | |
| 33 // format and parse times in UTC. | |
| 34 TIMESTAMP_FORMAT = "20060102T150405.000000000Z" | |
| 35 ) | |
| 36 | |
| 37 // formatId returns the timestamp and sequence number formatted for a Task ID. | |
| 38 // Format is "<encoded_timestamp>_<encoded_sequence_num>", where: | |
| 39 // - encoded_timestamp is the timestamp formatted using TIMESTAMP_FORMAT | |
| 40 // - encoded_sequence_num is | |
| 41 // "<lexical_ordering_char><hexidecimal_sequence_number>" | |
| 42 // - lexical_ordering_char is len(hexidecimal_sequence_number)-1 encoded as a | |
| 43 // hexidecimal digit, to allow lexical ordering of IDs. | |
| 44 // - hexidecimal_sequence_number is the sequence number formatted in | |
| 45 // hexidecimal digits with no padding. | |
| 46 // (Note that since the sequence number is uint64, the maximum possible value of | |
| 47 // len(hexidecimal_sequence_number)-1 is 15, so it can always be encoded as a | |
| 48 // single hexidecimal digit.) | |
|
borenet
2016/08/16 19:49:54
Why not just have this function zero-pad the seque
dogben
2016/08/17 13:29:44
You're right. I thought it was overkill to have th
| |
| 49 func formatId(t time.Time, seq uint64) string { | |
| 50 t = t.UTC() | |
| 51 h := fmt.Sprintf("%x", seq) | |
| 52 return fmt.Sprintf("%s_%x%s", t.Format(TIMESTAMP_FORMAT), len(h)-1, h) | |
| 53 } | |
| 54 | |
| 55 // parseId returns the timestamp and sequence number stored in a Task ID. | |
| 56 func parseId(id string) (time.Time, uint64, error) { | |
| 57 parts := strings.Split(id, "_") | |
| 58 if len(parts) != 2 { | |
| 59 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id) | |
| 60 } | |
| 61 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0]) | |
| 62 if err != nil { | |
| 63 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr) | |
| 64 } | |
| 65 var seq uint64 | |
| 66 // Skip the first character (lexicographic ordering character). | |
| 67 if len(parts[1]) < 2 { | |
| 68 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected a t least 2 hex digits in second part.", id) | |
| 69 } | |
| 70 i, err := fmt.Sscanf(parts[1][1:], "%x", &seq) | |
| 71 if err != nil { | |
| 72 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr) | |
| 73 } else if i != 1 { | |
| 74 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o ne hex number in %s, got %d", id, parts[1][1:], i) | |
| 75 } | |
| 76 return t, seq, nil | |
| 77 } | |
| 78 | |
| 79 // localDB accesses a local BoltDB database containing tasks. | |
| 80 type localDB struct { | |
| 81 // name is used in logging and metrics to identify this DB. | |
| 82 name string | |
| 83 | |
| 84 // db is the underlying BoltDB. | |
| 85 db *bolt.DB | |
| 86 | |
| 87 // tx fields contain metrics on the number of active transactions. Prote cted | |
| 88 // by txMutex. | |
| 89 txCount *metrics2.Counter | |
| 90 txNextId int64 | |
| 91 txActive map[int64]string | |
| 92 txMutex sync.RWMutex | |
| 93 | |
| 94 modTasks db.ModifiedTasks | |
| 95 } | |
| 96 | |
| 97 // startTx monitors when a transaction starts. | |
| 98 func (d *localDB) startTx(name string) int64 { | |
| 99 d.txMutex.Lock() | |
| 100 defer d.txMutex.Unlock() | |
| 101 d.txCount.Inc(1) | |
| 102 id := d.txNextId | |
| 103 d.txActive[id] = name | |
| 104 d.txNextId++ | |
| 105 return id | |
| 106 } | |
| 107 | |
| 108 // endTx monitors when a transaction ends. | |
| 109 func (d *localDB) endTx(id int64) { | |
| 110 d.txMutex.Lock() | |
| 111 defer d.txMutex.Unlock() | |
| 112 d.txCount.Dec(1) | |
| 113 delete(d.txActive, id) | |
| 114 } | |
| 115 | |
| 116 // reportActiveTx prints out the list of active transactions. | |
| 117 func (d *localDB) reportActiveTx() { | |
| 118 d.txMutex.RLock() | |
| 119 defer d.txMutex.RUnlock() | |
| 120 if len(d.txActive) == 0 { | |
| 121 glog.Infof("%s Active Transactions: (none)", d.name) | |
| 122 return | |
| 123 } | |
| 124 txs := make([]string, 0, len(d.txActive)) | |
| 125 for id, name := range d.txActive { | |
| 126 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name)) | |
| 127 } | |
| 128 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n" )) | |
| 129 } | |
| 130 | |
| 131 // tx is a wrapper for a BoltDB transaction which tracks statistics. | |
| 132 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error { | |
| 133 txId := d.startTx(name) | |
| 134 defer d.endTx(txId) | |
| 135 defer metrics2.NewTimer("db-tx-duration", map[string]string{ | |
| 136 "database": d.name, | |
| 137 "transaction": name, | |
| 138 }).Stop() | |
| 139 if update { | |
| 140 return d.db.Update(fn) | |
| 141 } else { | |
| 142 return d.db.View(fn) | |
| 143 } | |
| 144 } | |
| 145 | |
| 146 // view is a wrapper for the BoltDB instance's View method. | |
| 147 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error { | |
| 148 return d.tx(name, fn, false) | |
| 149 } | |
| 150 | |
| 151 // update is a wrapper for the BoltDB instance's Update method. | |
| 152 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error { | |
| 153 return d.tx(name, fn, true) | |
| 154 } | |
| 155 | |
| 156 // Returns the tasks bucket with FillPercent set. | |
| 157 func tasksBucket(tx *bolt.Tx) *bolt.Bucket { | |
| 158 b := tx.Bucket([]byte(BUCKET_TASKS)) | |
| 159 b.FillPercent = BUCKET_TASKS_FILL_PERCENT | |
| 160 return b | |
| 161 } | |
| 162 | |
| 163 // NewDB returns a local DB instance. | |
| 164 func NewDB(name, filename string) (db.DB, error) { | |
| 165 boltdb, err := bolt.Open(filename, 0600, nil) | |
| 166 if err != nil { | |
| 167 return nil, err | |
| 168 } | |
| 169 d := &localDB{ | |
| 170 name: name, | |
| 171 db: boltdb, | |
| 172 txCount: metrics2.GetCounter("db-active-tx", map[string]string{ | |
| 173 "database": name, | |
| 174 }), | |
| 175 txNextId: 0, | |
| 176 txActive: map[int64]string{}, | |
| 177 } | |
| 178 go func() { | |
| 179 for _ = range time.Tick(time.Minute) { | |
| 180 d.reportActiveTx() | |
| 181 } | |
| 182 }() | |
| 183 | |
| 184 if err := d.update("NewDB", func(tx *bolt.Tx) error { | |
| 185 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e rr != nil { | |
| 186 return err | |
| 187 } | |
| 188 return nil | |
| 189 }); err != nil { | |
| 190 return nil, err | |
| 191 } | |
| 192 | |
| 193 return d, nil | |
| 194 } | |
| 195 | |
| 196 // See docs for DB interface. | |
| 197 func (d *localDB) Close() error { | |
| 198 d.txMutex.Lock() | |
| 199 defer d.txMutex.Unlock() | |
| 200 d.txActive = map[int64]string{} | |
|
borenet
2016/08/16 19:49:54
nit: Should we also set the counter to zero here?
dogben
2016/08/17 13:29:44
I added a check that there are no active transacti
| |
| 201 | |
| 202 return d.db.Close() | |
| 203 } | |
| 204 | |
| 205 // Sets t.Id either based on t.Created or now. tx must be an update transaction. | |
| 206 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error { | |
| 207 if t.Id != "" { | |
| 208 return fmt.Errorf("Task Id already assigned: %v", t.Id) | |
| 209 } | |
| 210 ts := now | |
| 211 if !util.TimeIsZero(t.Created) { | |
| 212 ts = t.Created | |
| 213 } | |
| 214 seq, err := tasksBucket(tx).NextSequence() | |
| 215 if err != nil { | |
| 216 return err | |
| 217 } | |
| 218 t.Id = formatId(ts, seq) | |
| 219 return nil | |
| 220 } | |
| 221 | |
| 222 // See docs for DB interface. | |
| 223 func (d *localDB) AssignId(t *db.Task) error { | |
| 224 oldId := t.Id | |
| 225 err := d.update("AssignId", func(tx *bolt.Tx) error { | |
| 226 return d.assignId(tx, t, time.Now()) | |
| 227 }) | |
| 228 if err != nil { | |
| 229 t.Id = oldId | |
| 230 } | |
| 231 return err | |
| 232 } | |
| 233 | |
| 234 // See docs for DB interface. | |
| 235 func (d *localDB) GetTaskById(id string) (*db.Task, error) { | |
| 236 var rv *db.Task | |
| 237 if err := d.view("GetTaskById", func(tx *bolt.Tx) error { | |
| 238 serialized := tasksBucket(tx).Get([]byte(id)) | |
| 239 if serialized == nil { | |
| 240 return nil | |
| 241 } | |
| 242 var t db.Task | |
| 243 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t) ; err != nil { | |
| 244 return err | |
| 245 } | |
| 246 rv = &t | |
| 247 return nil | |
| 248 }); err != nil { | |
| 249 return nil, err | |
| 250 } | |
| 251 if rv == nil { | |
| 252 // Return an error if id is invalid. | |
| 253 if _, _, err := parseId(id); err != nil { | |
| 254 return nil, err | |
| 255 } | |
| 256 } | |
| 257 return rv, nil | |
| 258 } | |
| 259 | |
| 260 // See docs for DB interface. | |
| 261 // TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id. | |
| 262 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error ) { | |
| 263 min := []byte(start.UTC().Format(TIMESTAMP_FORMAT)) | |
| 264 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT)) | |
| 265 decoder := db.TaskDecoder{} | |
| 266 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error { | |
| 267 c := tasksBucket(tx).Cursor() | |
| 268 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { | |
| 269 cpy := make([]byte, len(v)) | |
| 270 copy(cpy, v) | |
| 271 if !decoder.Process(cpy) { | |
| 272 return nil | |
| 273 } | |
| 274 } | |
| 275 return nil | |
| 276 }); err != nil { | |
| 277 return nil, err | |
| 278 } | |
| 279 return decoder.Result() | |
| 280 } | |
| 281 | |
| 282 // See documentation for DB interface. | |
| 283 func (d *localDB) PutTask(t *db.Task) error { | |
| 284 return d.PutTasks([]*db.Task{t}) | |
| 285 } | |
| 286 | |
| 287 // validate returns an error if the task can not be inserted into the DB. Does | |
| 288 // not modify t. | |
| 289 func (d *localDB) validate(t *db.Task) error { | |
| 290 // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Cre ated. | |
| 291 return nil | |
| 292 } | |
| 293 | |
| 294 // See documentation for DB interface. | |
| 295 // TODO(benjaminwagner): Figure out how to detect write/write conflicts and | |
| 296 // return "concurrent modification" error. | |
| 297 func (d *localDB) PutTasks(tasks []*db.Task) error { | |
| 298 // If there is an error during the transaction, we should leave the task s | |
| 299 // unchanged. Save the old Ids since we set them below. | |
| 300 oldIds := make([]string, len(tasks)) | |
| 301 // Validate and save current Ids. | |
| 302 for _, t := range tasks { | |
| 303 if err := d.validate(t); err != nil { | |
| 304 return err | |
| 305 } | |
| 306 oldIds = append(oldIds, t.Id) | |
| 307 } | |
| 308 revertChanges := func() { | |
| 309 for i, oldId := range oldIds { | |
| 310 tasks[i].Id = oldId | |
| 311 } | |
| 312 } | |
| 313 err := d.update("PutTasks", func(tx *bolt.Tx) error { | |
| 314 // Assign Ids and encode. | |
| 315 e := db.TaskEncoder{} | |
| 316 now := time.Now() | |
| 317 for _, t := range tasks { | |
| 318 if t.Id == "" { | |
| 319 if err := d.assignId(tx, t, now); err != nil { | |
| 320 return err | |
| 321 } | |
| 322 } | |
| 323 e.Process(t) | |
| 324 } | |
| 325 // Insert/update. | |
| 326 for { | |
| 327 t, serialized, err := e.Next() | |
| 328 if err != nil { | |
| 329 return err | |
| 330 } | |
| 331 if t == nil { | |
| 332 break | |
| 333 } | |
| 334 if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil { | |
| 335 return err | |
| 336 } | |
| 337 } | |
| 338 return nil | |
| 339 }) | |
| 340 if err != nil { | |
| 341 revertChanges() | |
| 342 return err | |
| 343 } else { | |
| 344 // TODO(benjaminwagner): pass serialized bytes. | |
| 345 d.modTasks.TrackModifiedTasks(tasks) | |
| 346 } | |
| 347 return nil | |
| 348 } | |
| 349 | |
| 350 // See docs for DB interface. | |
| 351 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) { | |
| 352 return d.modTasks.GetModifiedTasks(id) | |
| 353 } | |
| 354 | |
| 355 // See docs for DB interface. | |
| 356 func (d *localDB) StartTrackingModifiedTasks() (string, error) { | |
| 357 return d.modTasks.StartTrackingModifiedTasks() | |
| 358 } | |
| 359 | |
| 360 // Returns the total number of tasks in the DB. | |
| 361 // TODO(benjaminwagner): add a metrics goroutine. | |
| 362 func (d *localDB) NumTasks() (int, error) { | |
| 363 var n int | |
| 364 if err := d.view("NumTasks", func(tx *bolt.Tx) error { | |
| 365 n = tasksBucket(tx).Stats().KeyN | |
| 366 return nil | |
| 367 }); err != nil { | |
| 368 return -1, err | |
| 369 } | |
| 370 return n, nil | |
| 371 } | |
| 372 | |
| 373 // RunBackupServer runs an HTTP server which provides downloadable database | |
| 374 // backups. | |
| 375 func (d *localDB) RunBackupServer(port string) error { | |
| 376 r := mux.NewRouter() | |
| 377 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) { | |
| 378 if err := d.view("Backup", func(tx *bolt.Tx) error { | |
| 379 w.Header().Set("Content-Type", "application/octet-stream ") | |
| 380 w.Header().Set("Content-Disposition", "attachment; filen ame=\"tasks.db\"") | |
| 381 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz e()))) | |
| 382 _, err := tx.WriteTo(w) | |
| 383 return err | |
| 384 }); err != nil { | |
| 385 glog.Errorf("Failed to create DB backup: %s", err) | |
| 386 httputils.ReportError(w, r, err, "Failed to create DB ba ckup") | |
| 387 } | |
| 388 }) | |
| 389 http.Handle("/", httputils.LoggingGzipRequestResponse(r)) | |
| 390 return http.ListenAndServe(port, nil) | |
| 391 } | |
| OLD | NEW |