OLD | NEW |
| (Empty) |
1 package local_db | |
2 | |
3 import ( | |
4 "bytes" | |
5 "encoding/binary" | |
6 "encoding/gob" | |
7 "fmt" | |
8 "net/http" | |
9 "sort" | |
10 "strconv" | |
11 "strings" | |
12 "sync" | |
13 "time" | |
14 | |
15 "github.com/boltdb/bolt" | |
16 "github.com/gorilla/mux" | |
17 "github.com/skia-dev/glog" | |
18 "go.skia.org/infra/build_scheduler/go/db" | |
19 "go.skia.org/infra/go/boltutil" | |
20 "go.skia.org/infra/go/httputils" | |
21 "go.skia.org/infra/go/metrics2" | |
22 "go.skia.org/infra/go/util" | |
23 ) | |
24 | |
25 const ( | |
26 // BUCKET_TASKS is the name of the Tasks bucket. Key is Task.Id, which i
s set | |
27 // to (creation time, sequence number) (see formatId for detail), value
is | |
28 // described in docs for BUCKET_TASKS_VERSION. Tasks will be updated in
place. | |
29 // All repos share the same bucket. | |
30 BUCKET_TASKS = "tasks" | |
31 // BUCKET_TASKS_FILL_PERCENT is the value to set for bolt.Bucket.FillPer
cent | |
32 // for BUCKET_TASKS. BUCKET_TASKS will be append-mostly, so use a high f
ill | |
33 // percent. | |
34 BUCKET_TASKS_FILL_PERCENT = 0.9 | |
35 // BUCKET_TASKS_VERSION indicates the format of the value of BUCKET_TASK
S | |
36 // written by PutTasks. Retrieving Tasks from the DB must support all pr
evious | |
37 // versions. For all versions, the first byte is the version number. | |
38 // Version 1: v[0] = 1; v[1:9] is the modified time as UnixNano encode
d as | |
39 // big endian; v[9:] is the GOB of the Task. | |
40 BUCKET_TASKS_VERSION = 1 | |
41 | |
42 // TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Pa
rse to | |
43 // format/parse the timestamp in the Task ID. It is similar to | |
44 // util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we
omit | |
45 // most of the punctuation. This timestamp can only be used to format an
d | |
46 // parse times in UTC. | |
47 TIMESTAMP_FORMAT = "20060102T150405.000000000Z" | |
48 // SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or | |
49 // fmt.Sscanf to format/parse the sequence number in the Task ID. It is
a | |
50 // 16-digit zero-padded lowercase hexidecimal number. | |
51 SEQUENCE_NUMBER_FORMAT = "%016x" | |
52 | |
53 // MAX_CREATED_TIME_SKEW is the maximum difference between the timestamp
in a | |
54 // Task's Id field and that Task's Created field. This allows AssignId t
o be | |
55 // called before creating the Swarming task so that the Id can be includ
ed in | |
56 // the Swarming task tags. GetTasksFromDateRange accounts for this skew
when | |
57 // retrieving tasks. This value can be increased in the future, but can
never | |
58 // be decreased. | |
59 // | |
60 // 6 minutes is based on httputils.DIAL_TIMEOUT + httputils.REQUEST_TIME
OUT, | |
61 // which is assumed to be the approximate maximum duration of a successf
ul | |
62 // swarming.ApiClient.TriggerTask() call. | |
63 MAX_CREATED_TIME_SKEW = 6 * time.Minute | |
64 ) | |
65 | |
66 // formatId returns the timestamp and sequence number formatted for a Task ID. | |
67 // Format is "<timestamp>_<sequence_num>", where the timestamp is formatted | |
68 // using TIMESTAMP_FORMAT and sequence_num is formatted using | |
69 // SEQUENCE_NUMBER_FORMAT. | |
70 func formatId(t time.Time, seq uint64) string { | |
71 t = t.UTC() | |
72 return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORM
AT), seq) | |
73 } | |
74 | |
75 // parseId returns the timestamp and sequence number stored in a Task ID. | |
76 func parseId(id string) (time.Time, uint64, error) { | |
77 parts := strings.Split(id, "_") | |
78 if len(parts) != 2 { | |
79 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id) | |
80 } | |
81 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0]) | |
82 if err != nil { | |
83 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e
rr) | |
84 } | |
85 var seq uint64 | |
86 // Add newlines to force Sscanf to match the entire string. Otherwise | |
87 // "123hello" will be parsed as 123. Note that Sscanf does not require 1
6 | |
88 // digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 dig
its. | |
89 i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq) | |
90 if err != nil { | |
91 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e
rr) | |
92 } else if i != 1 { | |
93 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o
ne hex number in %s, got %d", id, parts[1], i) | |
94 } | |
95 return t, seq, nil | |
96 } | |
97 | |
98 // packV1 creates a value as described for BUCKET_TASKS_VERSION = 1. t is the | |
99 // modified time and serialized is the GOB of the Task. | |
100 func packV1(t time.Time, serialized []byte) []byte { | |
101 rv := make([]byte, len(serialized)+9) | |
102 rv[0] = 1 | |
103 binary.BigEndian.PutUint64(rv[1:9], uint64(t.UnixNano())) | |
104 copy(rv[9:], serialized) | |
105 return rv | |
106 } | |
107 | |
108 // unpackV1 gets the modified time and GOB of the Task from a value as described | |
109 // by BUCKET_TASKS_VERSION = 1. The returned GOB shares structure with value. | |
110 func unpackV1(value []byte) (time.Time, []byte, error) { | |
111 if len(value) < 9 { | |
112 return time.Time{}, nil, fmt.Errorf("unpackV1 value is too short
(%d bytes)", len(value)) | |
113 } | |
114 if value[0] != 1 { | |
115 return time.Time{}, nil, fmt.Errorf("unpackV1 called for value w
ith version %d", value[0]) | |
116 } | |
117 t := time.Unix(0, int64(binary.BigEndian.Uint64(value[1:9]))).UTC() | |
118 return t, value[9:], nil | |
119 } | |
120 | |
121 // localDB accesses a local BoltDB database containing tasks. | |
122 type localDB struct { | |
123 // name is used in logging and metrics to identify this DB. | |
124 name string | |
125 | |
126 // db is the underlying BoltDB. | |
127 db *bolt.DB | |
128 | |
129 // tx fields contain metrics on the number of active transactions. Prote
cted | |
130 // by txMutex. | |
131 txCount *metrics2.Counter | |
132 txNextId int64 | |
133 txActive map[int64]string | |
134 txMutex sync.RWMutex | |
135 | |
136 dbMetric *boltutil.DbMetric | |
137 | |
138 modTasks db.ModifiedTasks | |
139 | |
140 // Close will send on each of these channels to indicate goroutines shou
ld | |
141 // stop. | |
142 notifyOnClose []chan bool | |
143 } | |
144 | |
145 // startTx monitors when a transaction starts. | |
146 func (d *localDB) startTx(name string) int64 { | |
147 d.txMutex.Lock() | |
148 defer d.txMutex.Unlock() | |
149 d.txCount.Inc(1) | |
150 id := d.txNextId | |
151 d.txActive[id] = name | |
152 d.txNextId++ | |
153 return id | |
154 } | |
155 | |
156 // endTx monitors when a transaction ends. | |
157 func (d *localDB) endTx(id int64) { | |
158 d.txMutex.Lock() | |
159 defer d.txMutex.Unlock() | |
160 d.txCount.Dec(1) | |
161 delete(d.txActive, id) | |
162 } | |
163 | |
164 // reportActiveTx prints out the list of active transactions. | |
165 func (d *localDB) reportActiveTx() { | |
166 d.txMutex.RLock() | |
167 defer d.txMutex.RUnlock() | |
168 if len(d.txActive) == 0 { | |
169 glog.Infof("%s Active Transactions: (none)", d.name) | |
170 return | |
171 } | |
172 txs := make([]string, 0, len(d.txActive)) | |
173 for id, name := range d.txActive { | |
174 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name)) | |
175 } | |
176 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n"
)) | |
177 } | |
178 | |
179 // tx is a wrapper for a BoltDB transaction which tracks statistics. | |
180 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error { | |
181 txId := d.startTx(name) | |
182 defer d.endTx(txId) | |
183 defer metrics2.NewTimer("db-tx-duration", map[string]string{ | |
184 "database": d.name, | |
185 "transaction": name, | |
186 }).Stop() | |
187 if update { | |
188 return d.db.Update(fn) | |
189 } else { | |
190 return d.db.View(fn) | |
191 } | |
192 } | |
193 | |
194 // view is a wrapper for the BoltDB instance's View method. | |
195 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error { | |
196 return d.tx(name, fn, false) | |
197 } | |
198 | |
199 // update is a wrapper for the BoltDB instance's Update method. | |
200 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error { | |
201 return d.tx(name, fn, true) | |
202 } | |
203 | |
204 // Returns the tasks bucket with FillPercent set. | |
205 func tasksBucket(tx *bolt.Tx) *bolt.Bucket { | |
206 b := tx.Bucket([]byte(BUCKET_TASKS)) | |
207 b.FillPercent = BUCKET_TASKS_FILL_PERCENT | |
208 return b | |
209 } | |
210 | |
211 // NewDB returns a local DB instance. | |
212 func NewDB(name, filename string) (db.DB, error) { | |
213 boltdb, err := bolt.Open(filename, 0600, nil) | |
214 if err != nil { | |
215 return nil, err | |
216 } | |
217 d := &localDB{ | |
218 name: name, | |
219 db: boltdb, | |
220 txCount: metrics2.GetCounter("db-active-tx", map[string]string{ | |
221 "database": name, | |
222 }), | |
223 txNextId: 0, | |
224 txActive: map[int64]string{}, | |
225 } | |
226 | |
227 stopReportActiveTx := make(chan bool) | |
228 d.notifyOnClose = append(d.notifyOnClose, stopReportActiveTx) | |
229 go func() { | |
230 t := time.NewTicker(time.Minute) | |
231 for { | |
232 select { | |
233 case <-stopReportActiveTx: | |
234 t.Stop() | |
235 return | |
236 case <-t.C: | |
237 d.reportActiveTx() | |
238 } | |
239 } | |
240 }() | |
241 | |
242 if err := d.update("NewDB", func(tx *bolt.Tx) error { | |
243 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e
rr != nil { | |
244 return err | |
245 } | |
246 return nil | |
247 }); err != nil { | |
248 return nil, err | |
249 } | |
250 | |
251 if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS},
map[string]string{"database": name}); err != nil { | |
252 return nil, err | |
253 } else { | |
254 d.dbMetric = dbMetric | |
255 } | |
256 | |
257 return d, nil | |
258 } | |
259 | |
260 // See docs for DB interface. | |
261 func (d *localDB) Close() error { | |
262 d.txMutex.Lock() | |
263 defer d.txMutex.Unlock() | |
264 if len(d.txActive) > 0 { | |
265 return fmt.Errorf("Can not close DB when transactions are active
.") | |
266 } | |
267 for _, c := range d.notifyOnClose { | |
268 c <- true | |
269 } | |
270 d.txActive = map[int64]string{} | |
271 if err := d.dbMetric.Delete(); err != nil { | |
272 return err | |
273 } | |
274 d.dbMetric = nil | |
275 if err := d.txCount.Delete(); err != nil { | |
276 return err | |
277 } | |
278 d.txCount = nil | |
279 return d.db.Close() | |
280 } | |
281 | |
282 // Sets t.Id either based on t.Created or now. tx must be an update transaction. | |
283 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error { | |
284 if t.Id != "" { | |
285 return fmt.Errorf("Task Id already assigned: %v", t.Id) | |
286 } | |
287 ts := now | |
288 if !util.TimeIsZero(t.Created) { | |
289 // TODO(benjaminwagner): Disallow assigning IDs based on t.Creat
ed; or | |
290 // ensure t.Created is > any ID ts in the DB. | |
291 ts = t.Created | |
292 } | |
293 seq, err := tasksBucket(tx).NextSequence() | |
294 if err != nil { | |
295 return err | |
296 } | |
297 t.Id = formatId(ts, seq) | |
298 return nil | |
299 } | |
300 | |
301 // See docs for DB interface. | |
302 func (d *localDB) AssignId(t *db.Task) error { | |
303 oldId := t.Id | |
304 err := d.update("AssignId", func(tx *bolt.Tx) error { | |
305 return d.assignId(tx, t, time.Now()) | |
306 }) | |
307 if err != nil { | |
308 t.Id = oldId | |
309 } | |
310 return err | |
311 } | |
312 | |
313 // See docs for DB interface. | |
314 func (d *localDB) GetTaskById(id string) (*db.Task, error) { | |
315 var rv *db.Task | |
316 if err := d.view("GetTaskById", func(tx *bolt.Tx) error { | |
317 value := tasksBucket(tx).Get([]byte(id)) | |
318 if value == nil { | |
319 return nil | |
320 } | |
321 // Only BUCKET_TASKS_VERSION = 1 is implemented right now. | |
322 // TODO(benjaminwagner): Add functions "pack" and "unpack" that
determine | |
323 // which version to use. | |
324 _, serialized, err := unpackV1(value) | |
325 if err != nil { | |
326 return err | |
327 } | |
328 var t db.Task | |
329 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t)
; err != nil { | |
330 return err | |
331 } | |
332 rv = &t | |
333 return nil | |
334 }); err != nil { | |
335 return nil, err | |
336 } | |
337 if rv == nil { | |
338 // Return an error if id is invalid. | |
339 if _, _, err := parseId(id); err != nil { | |
340 return nil, err | |
341 } | |
342 } | |
343 return rv, nil | |
344 } | |
345 | |
346 // See docs for DB interface. | |
347 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error
) { | |
348 min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_F
ORMAT)) | |
349 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT)) | |
350 decoder := db.TaskDecoder{} | |
351 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error { | |
352 c := tasksBucket(tx).Cursor() | |
353 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0;
k, v = c.Next() { | |
354 // Only BUCKET_TASKS_VERSION = 1 is implemented right no
w. | |
355 _, serialized, err := unpackV1(v) | |
356 if err != nil { | |
357 return err | |
358 } | |
359 cpy := make([]byte, len(serialized)) | |
360 copy(cpy, serialized) | |
361 if !decoder.Process(cpy) { | |
362 return nil | |
363 } | |
364 } | |
365 return nil | |
366 }); err != nil { | |
367 return nil, err | |
368 } | |
369 result, err := decoder.Result() | |
370 if err != nil { | |
371 return nil, err | |
372 } | |
373 sort.Sort(db.TaskSlice(result)) | |
374 // The Tasks retrieved based on Id timestamp may include Tasks with Crea
ted | |
375 // time before/after the desired range. | |
376 // TODO(benjaminwagner): Biased binary search might be faster. | |
377 startIdx := 0 | |
378 for startIdx < len(result) && result[startIdx].Created.Before(start) { | |
379 startIdx++ | |
380 } | |
381 endIdx := len(result) | |
382 for endIdx > 0 && !result[endIdx-1].Created.Before(end) { | |
383 endIdx-- | |
384 } | |
385 return result[startIdx:endIdx], nil | |
386 } | |
387 | |
388 // See documentation for DB interface. | |
389 func (d *localDB) PutTask(t *db.Task) error { | |
390 return d.PutTasks([]*db.Task{t}) | |
391 } | |
392 | |
393 // validate returns an error if the task can not be inserted into the DB. Does | |
394 // not modify t. | |
395 func (d *localDB) validate(t *db.Task) error { | |
396 if util.TimeIsZero(t.Created) { | |
397 return fmt.Errorf("Created not set. Task %s created time is %s.
%v", t.Id, t.Created, t) | |
398 } | |
399 if t.Id != "" { | |
400 idTs, _, err := parseId(t.Id) | |
401 if err != nil { | |
402 return err | |
403 } | |
404 if t.Created.Sub(idTs) > MAX_CREATED_TIME_SKEW { | |
405 return fmt.Errorf("Created too late. Task %s was assigne
d Id at %s which is %s before Created time %s, more than MAX_CREATED_TIME_SKEW =
%s.", t.Id, idTs, t.Created.Sub(idTs), t.Created, MAX_CREATED_TIME_SKEW) | |
406 } | |
407 if t.Created.Before(idTs) { | |
408 return fmt.Errorf("Created too early. Task %s Created ti
me was changed or set to %s after Id assigned at %s.", t.Id, t.Created, idTs) | |
409 } | |
410 } | |
411 return nil | |
412 } | |
413 | |
414 // See documentation for DB interface. | |
415 func (d *localDB) PutTasks(tasks []*db.Task) error { | |
416 // If there is an error during the transaction, we should leave the task
s | |
417 // unchanged. Save the old Ids and DbModified times since we set them be
low. | |
418 type savedData struct { | |
419 Id string | |
420 DbModified time.Time | |
421 } | |
422 oldData := make([]savedData, 0, len(tasks)) | |
423 // Validate and save current data. | |
424 for _, t := range tasks { | |
425 if err := d.validate(t); err != nil { | |
426 return err | |
427 } | |
428 oldData = append(oldData, savedData{ | |
429 Id: t.Id, | |
430 DbModified: t.DbModified, | |
431 }) | |
432 } | |
433 revertChanges := func() { | |
434 for i, data := range oldData { | |
435 tasks[i].Id = data.Id | |
436 tasks[i].DbModified = data.DbModified | |
437 } | |
438 } | |
439 gobs := make(map[string][]byte, len(tasks)) | |
440 err := d.update("PutTasks", func(tx *bolt.Tx) error { | |
441 bucket := tasksBucket(tx) | |
442 // Assign Ids and encode. | |
443 e := db.TaskEncoder{} | |
444 now := time.Now().UTC() | |
445 for _, t := range tasks { | |
446 if t.Id == "" { | |
447 if err := d.assignId(tx, t, now); err != nil { | |
448 return err | |
449 } | |
450 } else { | |
451 if value := bucket.Get([]byte(t.Id)); value != n
il { | |
452 modTs, serialized, err := unpackV1(value
) | |
453 if err != nil { | |
454 return err | |
455 } | |
456 if !modTs.Equal(t.DbModified) { | |
457 var existing db.Task | |
458 if err := gob.NewDecoder(bytes.N
ewReader(serialized)).Decode(&existing); err != nil { | |
459 return err | |
460 } | |
461 glog.Warningf("Cached Task has b
een modified in the DB. Current:\n%#v\nCached:\n%#v", existing, t) | |
462 return db.ErrConcurrentUpdate | |
463 } | |
464 } | |
465 } | |
466 t.DbModified = now | |
467 e.Process(t) | |
468 } | |
469 // Insert/update. | |
470 for { | |
471 t, serialized, err := e.Next() | |
472 if err != nil { | |
473 return err | |
474 } | |
475 if t == nil { | |
476 break | |
477 } | |
478 gobs[t.Id] = serialized | |
479 // BUCKET_TASKS_VERSION = 1 | |
480 value := packV1(now, serialized) | |
481 if err := bucket.Put([]byte(t.Id), value); err != nil { | |
482 return err | |
483 } | |
484 } | |
485 return nil | |
486 }) | |
487 if err != nil { | |
488 revertChanges() | |
489 return err | |
490 } else { | |
491 d.modTasks.TrackModifiedTasksGOB(gobs) | |
492 } | |
493 return nil | |
494 } | |
495 | |
496 // See docs for DB interface. | |
497 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) { | |
498 return d.modTasks.GetModifiedTasks(id) | |
499 } | |
500 | |
501 // See docs for DB interface. | |
502 func (d *localDB) StartTrackingModifiedTasks() (string, error) { | |
503 return d.modTasks.StartTrackingModifiedTasks() | |
504 } | |
505 | |
506 // See docs for DB interface. | |
507 func (d *localDB) StopTrackingModifiedTasks(id string) { | |
508 d.modTasks.StopTrackingModifiedTasks(id) | |
509 } | |
510 | |
511 // RunBackupServer runs an HTTP server which provides downloadable database | |
512 // backups. | |
513 func (d *localDB) RunBackupServer(port string) error { | |
514 r := mux.NewRouter() | |
515 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) { | |
516 if err := d.view("Backup", func(tx *bolt.Tx) error { | |
517 w.Header().Set("Content-Type", "application/octet-stream
") | |
518 w.Header().Set("Content-Disposition", "attachment; filen
ame=\"tasks.db\"") | |
519 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz
e()))) | |
520 _, err := tx.WriteTo(w) | |
521 return err | |
522 }); err != nil { | |
523 glog.Errorf("Failed to create DB backup: %s", err) | |
524 httputils.ReportError(w, r, err, "Failed to create DB ba
ckup") | |
525 } | |
526 }) | |
527 http.Handle("/", httputils.LoggingGzipRequestResponse(r)) | |
528 return http.ListenAndServe(port, nil) | |
529 } | |
OLD | NEW |