Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(252)

Side by Side Diff: build_scheduler/go/db/local_db/local_db.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package local_db
2
3 import (
4 "bytes"
5 "encoding/binary"
6 "encoding/gob"
7 "fmt"
8 "net/http"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/boltdb/bolt"
16 "github.com/gorilla/mux"
17 "github.com/skia-dev/glog"
18 "go.skia.org/infra/build_scheduler/go/db"
19 "go.skia.org/infra/go/boltutil"
20 "go.skia.org/infra/go/httputils"
21 "go.skia.org/infra/go/metrics2"
22 "go.skia.org/infra/go/util"
23 )
24
25 const (
26 // BUCKET_TASKS is the name of the Tasks bucket. Key is Task.Id, which i s set
27 // to (creation time, sequence number) (see formatId for detail), value is
28 // described in docs for BUCKET_TASKS_VERSION. Tasks will be updated in place.
29 // All repos share the same bucket.
30 BUCKET_TASKS = "tasks"
31 // BUCKET_TASKS_FILL_PERCENT is the value to set for bolt.Bucket.FillPer cent
32 // for BUCKET_TASKS. BUCKET_TASKS will be append-mostly, so use a high f ill
33 // percent.
34 BUCKET_TASKS_FILL_PERCENT = 0.9
35 // BUCKET_TASKS_VERSION indicates the format of the value of BUCKET_TASK S
36 // written by PutTasks. Retrieving Tasks from the DB must support all pr evious
37 // versions. For all versions, the first byte is the version number.
38 // Version 1: v[0] = 1; v[1:9] is the modified time as UnixNano encode d as
39 // big endian; v[9:] is the GOB of the Task.
40 BUCKET_TASKS_VERSION = 1
41
42 // TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Pa rse to
43 // format/parse the timestamp in the Task ID. It is similar to
44 // util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we omit
45 // most of the punctuation. This timestamp can only be used to format an d
46 // parse times in UTC.
47 TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
48 // SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or
49 // fmt.Sscanf to format/parse the sequence number in the Task ID. It is a
50 // 16-digit zero-padded lowercase hexidecimal number.
51 SEQUENCE_NUMBER_FORMAT = "%016x"
52
53 // MAX_CREATED_TIME_SKEW is the maximum difference between the timestamp in a
54 // Task's Id field and that Task's Created field. This allows AssignId t o be
55 // called before creating the Swarming task so that the Id can be includ ed in
56 // the Swarming task tags. GetTasksFromDateRange accounts for this skew when
57 // retrieving tasks. This value can be increased in the future, but can never
58 // be decreased.
59 //
60 // 6 minutes is based on httputils.DIAL_TIMEOUT + httputils.REQUEST_TIME OUT,
61 // which is assumed to be the approximate maximum duration of a successf ul
62 // swarming.ApiClient.TriggerTask() call.
63 MAX_CREATED_TIME_SKEW = 6 * time.Minute
64 )
65
66 // formatId returns the timestamp and sequence number formatted for a Task ID.
67 // Format is "<timestamp>_<sequence_num>", where the timestamp is formatted
68 // using TIMESTAMP_FORMAT and sequence_num is formatted using
69 // SEQUENCE_NUMBER_FORMAT.
70 func formatId(t time.Time, seq uint64) string {
71 t = t.UTC()
72 return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORM AT), seq)
73 }
74
75 // parseId returns the timestamp and sequence number stored in a Task ID.
76 func parseId(id string) (time.Time, uint64, error) {
77 parts := strings.Split(id, "_")
78 if len(parts) != 2 {
79 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
80 }
81 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
82 if err != nil {
83 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr)
84 }
85 var seq uint64
86 // Add newlines to force Sscanf to match the entire string. Otherwise
87 // "123hello" will be parsed as 123. Note that Sscanf does not require 1 6
88 // digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 dig its.
89 i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq)
90 if err != nil {
91 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr)
92 } else if i != 1 {
93 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o ne hex number in %s, got %d", id, parts[1], i)
94 }
95 return t, seq, nil
96 }
97
98 // packV1 creates a value as described for BUCKET_TASKS_VERSION = 1. t is the
99 // modified time and serialized is the GOB of the Task.
100 func packV1(t time.Time, serialized []byte) []byte {
101 rv := make([]byte, len(serialized)+9)
102 rv[0] = 1
103 binary.BigEndian.PutUint64(rv[1:9], uint64(t.UnixNano()))
104 copy(rv[9:], serialized)
105 return rv
106 }
107
108 // unpackV1 gets the modified time and GOB of the Task from a value as described
109 // by BUCKET_TASKS_VERSION = 1. The returned GOB shares structure with value.
110 func unpackV1(value []byte) (time.Time, []byte, error) {
111 if len(value) < 9 {
112 return time.Time{}, nil, fmt.Errorf("unpackV1 value is too short (%d bytes)", len(value))
113 }
114 if value[0] != 1 {
115 return time.Time{}, nil, fmt.Errorf("unpackV1 called for value w ith version %d", value[0])
116 }
117 t := time.Unix(0, int64(binary.BigEndian.Uint64(value[1:9]))).UTC()
118 return t, value[9:], nil
119 }
120
121 // localDB accesses a local BoltDB database containing tasks.
122 type localDB struct {
123 // name is used in logging and metrics to identify this DB.
124 name string
125
126 // db is the underlying BoltDB.
127 db *bolt.DB
128
129 // tx fields contain metrics on the number of active transactions. Prote cted
130 // by txMutex.
131 txCount *metrics2.Counter
132 txNextId int64
133 txActive map[int64]string
134 txMutex sync.RWMutex
135
136 dbMetric *boltutil.DbMetric
137
138 modTasks db.ModifiedTasks
139
140 // Close will send on each of these channels to indicate goroutines shou ld
141 // stop.
142 notifyOnClose []chan bool
143 }
144
145 // startTx monitors when a transaction starts.
146 func (d *localDB) startTx(name string) int64 {
147 d.txMutex.Lock()
148 defer d.txMutex.Unlock()
149 d.txCount.Inc(1)
150 id := d.txNextId
151 d.txActive[id] = name
152 d.txNextId++
153 return id
154 }
155
156 // endTx monitors when a transaction ends.
157 func (d *localDB) endTx(id int64) {
158 d.txMutex.Lock()
159 defer d.txMutex.Unlock()
160 d.txCount.Dec(1)
161 delete(d.txActive, id)
162 }
163
164 // reportActiveTx prints out the list of active transactions.
165 func (d *localDB) reportActiveTx() {
166 d.txMutex.RLock()
167 defer d.txMutex.RUnlock()
168 if len(d.txActive) == 0 {
169 glog.Infof("%s Active Transactions: (none)", d.name)
170 return
171 }
172 txs := make([]string, 0, len(d.txActive))
173 for id, name := range d.txActive {
174 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
175 }
176 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n" ))
177 }
178
179 // tx is a wrapper for a BoltDB transaction which tracks statistics.
180 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
181 txId := d.startTx(name)
182 defer d.endTx(txId)
183 defer metrics2.NewTimer("db-tx-duration", map[string]string{
184 "database": d.name,
185 "transaction": name,
186 }).Stop()
187 if update {
188 return d.db.Update(fn)
189 } else {
190 return d.db.View(fn)
191 }
192 }
193
194 // view is a wrapper for the BoltDB instance's View method.
195 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
196 return d.tx(name, fn, false)
197 }
198
199 // update is a wrapper for the BoltDB instance's Update method.
200 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
201 return d.tx(name, fn, true)
202 }
203
204 // Returns the tasks bucket with FillPercent set.
205 func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
206 b := tx.Bucket([]byte(BUCKET_TASKS))
207 b.FillPercent = BUCKET_TASKS_FILL_PERCENT
208 return b
209 }
210
211 // NewDB returns a local DB instance.
212 func NewDB(name, filename string) (db.DB, error) {
213 boltdb, err := bolt.Open(filename, 0600, nil)
214 if err != nil {
215 return nil, err
216 }
217 d := &localDB{
218 name: name,
219 db: boltdb,
220 txCount: metrics2.GetCounter("db-active-tx", map[string]string{
221 "database": name,
222 }),
223 txNextId: 0,
224 txActive: map[int64]string{},
225 }
226
227 stopReportActiveTx := make(chan bool)
228 d.notifyOnClose = append(d.notifyOnClose, stopReportActiveTx)
229 go func() {
230 t := time.NewTicker(time.Minute)
231 for {
232 select {
233 case <-stopReportActiveTx:
234 t.Stop()
235 return
236 case <-t.C:
237 d.reportActiveTx()
238 }
239 }
240 }()
241
242 if err := d.update("NewDB", func(tx *bolt.Tx) error {
243 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e rr != nil {
244 return err
245 }
246 return nil
247 }); err != nil {
248 return nil, err
249 }
250
251 if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS}, map[string]string{"database": name}); err != nil {
252 return nil, err
253 } else {
254 d.dbMetric = dbMetric
255 }
256
257 return d, nil
258 }
259
260 // See docs for DB interface.
261 func (d *localDB) Close() error {
262 d.txMutex.Lock()
263 defer d.txMutex.Unlock()
264 if len(d.txActive) > 0 {
265 return fmt.Errorf("Can not close DB when transactions are active .")
266 }
267 for _, c := range d.notifyOnClose {
268 c <- true
269 }
270 d.txActive = map[int64]string{}
271 if err := d.dbMetric.Delete(); err != nil {
272 return err
273 }
274 d.dbMetric = nil
275 if err := d.txCount.Delete(); err != nil {
276 return err
277 }
278 d.txCount = nil
279 return d.db.Close()
280 }
281
282 // Sets t.Id either based on t.Created or now. tx must be an update transaction.
283 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error {
284 if t.Id != "" {
285 return fmt.Errorf("Task Id already assigned: %v", t.Id)
286 }
287 ts := now
288 if !util.TimeIsZero(t.Created) {
289 // TODO(benjaminwagner): Disallow assigning IDs based on t.Creat ed; or
290 // ensure t.Created is > any ID ts in the DB.
291 ts = t.Created
292 }
293 seq, err := tasksBucket(tx).NextSequence()
294 if err != nil {
295 return err
296 }
297 t.Id = formatId(ts, seq)
298 return nil
299 }
300
301 // See docs for DB interface.
302 func (d *localDB) AssignId(t *db.Task) error {
303 oldId := t.Id
304 err := d.update("AssignId", func(tx *bolt.Tx) error {
305 return d.assignId(tx, t, time.Now())
306 })
307 if err != nil {
308 t.Id = oldId
309 }
310 return err
311 }
312
313 // See docs for DB interface.
314 func (d *localDB) GetTaskById(id string) (*db.Task, error) {
315 var rv *db.Task
316 if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
317 value := tasksBucket(tx).Get([]byte(id))
318 if value == nil {
319 return nil
320 }
321 // Only BUCKET_TASKS_VERSION = 1 is implemented right now.
322 // TODO(benjaminwagner): Add functions "pack" and "unpack" that determine
323 // which version to use.
324 _, serialized, err := unpackV1(value)
325 if err != nil {
326 return err
327 }
328 var t db.Task
329 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t) ; err != nil {
330 return err
331 }
332 rv = &t
333 return nil
334 }); err != nil {
335 return nil, err
336 }
337 if rv == nil {
338 // Return an error if id is invalid.
339 if _, _, err := parseId(id); err != nil {
340 return nil, err
341 }
342 }
343 return rv, nil
344 }
345
346 // See docs for DB interface.
347 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error ) {
348 min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_F ORMAT))
349 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
350 decoder := db.TaskDecoder{}
351 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
352 c := tasksBucket(tx).Cursor()
353 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
354 // Only BUCKET_TASKS_VERSION = 1 is implemented right no w.
355 _, serialized, err := unpackV1(v)
356 if err != nil {
357 return err
358 }
359 cpy := make([]byte, len(serialized))
360 copy(cpy, serialized)
361 if !decoder.Process(cpy) {
362 return nil
363 }
364 }
365 return nil
366 }); err != nil {
367 return nil, err
368 }
369 result, err := decoder.Result()
370 if err != nil {
371 return nil, err
372 }
373 sort.Sort(db.TaskSlice(result))
374 // The Tasks retrieved based on Id timestamp may include Tasks with Crea ted
375 // time before/after the desired range.
376 // TODO(benjaminwagner): Biased binary search might be faster.
377 startIdx := 0
378 for startIdx < len(result) && result[startIdx].Created.Before(start) {
379 startIdx++
380 }
381 endIdx := len(result)
382 for endIdx > 0 && !result[endIdx-1].Created.Before(end) {
383 endIdx--
384 }
385 return result[startIdx:endIdx], nil
386 }
387
388 // See documentation for DB interface.
389 func (d *localDB) PutTask(t *db.Task) error {
390 return d.PutTasks([]*db.Task{t})
391 }
392
393 // validate returns an error if the task can not be inserted into the DB. Does
394 // not modify t.
395 func (d *localDB) validate(t *db.Task) error {
396 if util.TimeIsZero(t.Created) {
397 return fmt.Errorf("Created not set. Task %s created time is %s. %v", t.Id, t.Created, t)
398 }
399 if t.Id != "" {
400 idTs, _, err := parseId(t.Id)
401 if err != nil {
402 return err
403 }
404 if t.Created.Sub(idTs) > MAX_CREATED_TIME_SKEW {
405 return fmt.Errorf("Created too late. Task %s was assigne d Id at %s which is %s before Created time %s, more than MAX_CREATED_TIME_SKEW = %s.", t.Id, idTs, t.Created.Sub(idTs), t.Created, MAX_CREATED_TIME_SKEW)
406 }
407 if t.Created.Before(idTs) {
408 return fmt.Errorf("Created too early. Task %s Created ti me was changed or set to %s after Id assigned at %s.", t.Id, t.Created, idTs)
409 }
410 }
411 return nil
412 }
413
414 // See documentation for DB interface.
415 func (d *localDB) PutTasks(tasks []*db.Task) error {
416 // If there is an error during the transaction, we should leave the task s
417 // unchanged. Save the old Ids and DbModified times since we set them be low.
418 type savedData struct {
419 Id string
420 DbModified time.Time
421 }
422 oldData := make([]savedData, 0, len(tasks))
423 // Validate and save current data.
424 for _, t := range tasks {
425 if err := d.validate(t); err != nil {
426 return err
427 }
428 oldData = append(oldData, savedData{
429 Id: t.Id,
430 DbModified: t.DbModified,
431 })
432 }
433 revertChanges := func() {
434 for i, data := range oldData {
435 tasks[i].Id = data.Id
436 tasks[i].DbModified = data.DbModified
437 }
438 }
439 gobs := make(map[string][]byte, len(tasks))
440 err := d.update("PutTasks", func(tx *bolt.Tx) error {
441 bucket := tasksBucket(tx)
442 // Assign Ids and encode.
443 e := db.TaskEncoder{}
444 now := time.Now().UTC()
445 for _, t := range tasks {
446 if t.Id == "" {
447 if err := d.assignId(tx, t, now); err != nil {
448 return err
449 }
450 } else {
451 if value := bucket.Get([]byte(t.Id)); value != n il {
452 modTs, serialized, err := unpackV1(value )
453 if err != nil {
454 return err
455 }
456 if !modTs.Equal(t.DbModified) {
457 var existing db.Task
458 if err := gob.NewDecoder(bytes.N ewReader(serialized)).Decode(&existing); err != nil {
459 return err
460 }
461 glog.Warningf("Cached Task has b een modified in the DB. Current:\n%#v\nCached:\n%#v", existing, t)
462 return db.ErrConcurrentUpdate
463 }
464 }
465 }
466 t.DbModified = now
467 e.Process(t)
468 }
469 // Insert/update.
470 for {
471 t, serialized, err := e.Next()
472 if err != nil {
473 return err
474 }
475 if t == nil {
476 break
477 }
478 gobs[t.Id] = serialized
479 // BUCKET_TASKS_VERSION = 1
480 value := packV1(now, serialized)
481 if err := bucket.Put([]byte(t.Id), value); err != nil {
482 return err
483 }
484 }
485 return nil
486 })
487 if err != nil {
488 revertChanges()
489 return err
490 } else {
491 d.modTasks.TrackModifiedTasksGOB(gobs)
492 }
493 return nil
494 }
495
496 // See docs for DB interface.
497 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) {
498 return d.modTasks.GetModifiedTasks(id)
499 }
500
501 // See docs for DB interface.
502 func (d *localDB) StartTrackingModifiedTasks() (string, error) {
503 return d.modTasks.StartTrackingModifiedTasks()
504 }
505
506 // See docs for DB interface.
507 func (d *localDB) StopTrackingModifiedTasks(id string) {
508 d.modTasks.StopTrackingModifiedTasks(id)
509 }
510
511 // RunBackupServer runs an HTTP server which provides downloadable database
512 // backups.
513 func (d *localDB) RunBackupServer(port string) error {
514 r := mux.NewRouter()
515 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) {
516 if err := d.view("Backup", func(tx *bolt.Tx) error {
517 w.Header().Set("Content-Type", "application/octet-stream ")
518 w.Header().Set("Content-Disposition", "attachment; filen ame=\"tasks.db\"")
519 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz e())))
520 _, err := tx.WriteTo(w)
521 return err
522 }); err != nil {
523 glog.Errorf("Failed to create DB backup: %s", err)
524 httputils.ReportError(w, r, err, "Failed to create DB ba ckup")
525 }
526 })
527 http.Handle("/", httputils.LoggingGzipRequestResponse(r))
528 return http.ListenAndServe(port, nil)
529 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/local_db/busywork/main.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698