Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(206)

Side by Side Diff: build_scheduler/go/db/local_db/local_db.go

Issue 2246933002: Add Task DB implementation using a local BoltDB. (Closed) Base URL: https://skia.googlesource.com/buildbot@taskdb-impl-track
Patch Set: Fix bad merge. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « build_scheduler/go/db/db_test.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package local_db
2
3 import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "net/http"
8 "strconv"
9 "strings"
10 "sync"
11 "time"
12
13 "github.com/boltdb/bolt"
14 "github.com/gorilla/mux"
15 "github.com/skia-dev/glog"
16 "go.skia.org/infra/build_scheduler/go/db"
17 "go.skia.org/infra/go/httputils"
18 "go.skia.org/infra/go/metrics2"
19 "go.skia.org/infra/go/util"
20 )
21
22 const (
23 // Tasks. Key is Task.Id, which is set to (creation time, sequence numbe r)
24 // (see formatId for detail), value is the GOB of the task. Tasks will b e
25 // updated in place. All repos share the same bucket.
26 // TODO(benjaminwagner): May need to prefix value with metadata.
27 BUCKET_TASKS = "tasks"
28 // BUCKET_TASKS will be append-mostly, so use a high fill percent.
29 BUCKET_TASKS_FILL_PERCENT = 0.9
30
31 // TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Pa rse to
32 // format/parse the timestamp in the Task ID. It is similar to
33 // util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we omit
34 // most of the punctuation. This timestamp can only be used to format an d
35 // parse times in UTC.
36 TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
37 // SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or
38 // fmt.Sscanf to format/parse the sequence number in the Task ID. It is a
39 // 16-digit zero-padded lowercase hexidecimal number.
40 SEQUENCE_NUMBER_FORMAT = "%016x"
41 )
42
43 // formatId returns the timestamp and sequence number formatted for a Task ID.
44 // Format is "<timestamp>_<sequence_num>", where the timestamp is formatted
45 // using TIMESTAMP_FORMAT and sequence_num is formatted using
46 // SEQUENCE_NUMBER_FORMAT.
47 func formatId(t time.Time, seq uint64) string {
48 t = t.UTC()
49 return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORM AT), seq)
50 }
51
52 // parseId returns the timestamp and sequence number stored in a Task ID.
53 func parseId(id string) (time.Time, uint64, error) {
54 parts := strings.Split(id, "_")
55 if len(parts) != 2 {
56 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
57 }
58 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
59 if err != nil {
60 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr)
61 }
62 var seq uint64
63 // Add newlines to force Sscanf to match the entire string. Otherwise
64 // "123hello" will be parsed as 123. Note that Sscanf does not require 1 6
65 // digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 dig its.
66 i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq)
67 if err != nil {
68 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr)
69 } else if i != 1 {
70 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o ne hex number in %s, got %d", id, parts[1], i)
71 }
72 return t, seq, nil
73 }
74
75 // localDB accesses a local BoltDB database containing tasks.
76 type localDB struct {
77 // name is used in logging and metrics to identify this DB.
78 name string
79
80 // db is the underlying BoltDB.
81 db *bolt.DB
82
83 // tx fields contain metrics on the number of active transactions. Prote cted
84 // by txMutex.
85 txCount *metrics2.Counter
86 txNextId int64
87 txActive map[int64]string
88 txMutex sync.RWMutex
89
90 modTasks db.ModifiedTasks
91 }
92
93 // startTx monitors when a transaction starts.
94 func (d *localDB) startTx(name string) int64 {
95 d.txMutex.Lock()
96 defer d.txMutex.Unlock()
97 d.txCount.Inc(1)
98 id := d.txNextId
99 d.txActive[id] = name
100 d.txNextId++
101 return id
102 }
103
104 // endTx monitors when a transaction ends.
105 func (d *localDB) endTx(id int64) {
106 d.txMutex.Lock()
107 defer d.txMutex.Unlock()
108 d.txCount.Dec(1)
109 delete(d.txActive, id)
110 }
111
112 // reportActiveTx prints out the list of active transactions.
113 func (d *localDB) reportActiveTx() {
114 d.txMutex.RLock()
115 defer d.txMutex.RUnlock()
116 if len(d.txActive) == 0 {
117 glog.Infof("%s Active Transactions: (none)", d.name)
118 return
119 }
120 txs := make([]string, 0, len(d.txActive))
121 for id, name := range d.txActive {
122 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
123 }
124 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n" ))
125 }
126
127 // tx is a wrapper for a BoltDB transaction which tracks statistics.
128 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
129 txId := d.startTx(name)
130 defer d.endTx(txId)
131 defer metrics2.NewTimer("db-tx-duration", map[string]string{
132 "database": d.name,
133 "transaction": name,
134 }).Stop()
135 if update {
136 return d.db.Update(fn)
137 } else {
138 return d.db.View(fn)
139 }
140 }
141
142 // view is a wrapper for the BoltDB instance's View method.
143 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
144 return d.tx(name, fn, false)
145 }
146
147 // update is a wrapper for the BoltDB instance's Update method.
148 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
149 return d.tx(name, fn, true)
150 }
151
152 // Returns the tasks bucket with FillPercent set.
153 func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
154 b := tx.Bucket([]byte(BUCKET_TASKS))
155 b.FillPercent = BUCKET_TASKS_FILL_PERCENT
156 return b
157 }
158
159 // NewDB returns a local DB instance.
160 func NewDB(name, filename string) (db.DB, error) {
161 boltdb, err := bolt.Open(filename, 0600, nil)
162 if err != nil {
163 return nil, err
164 }
165 d := &localDB{
166 name: name,
167 db: boltdb,
168 txCount: metrics2.GetCounter("db-active-tx", map[string]string{
169 "database": name,
170 }),
171 txNextId: 0,
172 txActive: map[int64]string{},
173 }
174 go func() {
175 for _ = range time.Tick(time.Minute) {
176 d.reportActiveTx()
177 }
178 }()
179
180 if err := d.update("NewDB", func(tx *bolt.Tx) error {
181 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e rr != nil {
182 return err
183 }
184 return nil
185 }); err != nil {
186 return nil, err
187 }
188
189 return d, nil
190 }
191
192 // See docs for DB interface.
193 func (d *localDB) Close() error {
194 d.txMutex.Lock()
195 defer d.txMutex.Unlock()
196 if len(d.txActive) > 0 {
197 return fmt.Errorf("Can not close DB when transactions are active .")
198 }
199 // TODO(benjaminwagner): Make this work.
borenet 2016/08/17 13:51:13 What's not working?
dogben 2016/08/17 13:57:58 Actually, it would be great if you can take a look
200 //if err := d.txCount.Delete(); err != nil {
201 // return err
202 //}
203 d.txActive = map[int64]string{}
204 return d.db.Close()
205 }
206
207 // Sets t.Id either based on t.Created or now. tx must be an update transaction.
208 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error {
209 if t.Id != "" {
210 return fmt.Errorf("Task Id already assigned: %v", t.Id)
211 }
212 ts := now
213 if !util.TimeIsZero(t.Created) {
214 ts = t.Created
215 }
216 seq, err := tasksBucket(tx).NextSequence()
217 if err != nil {
218 return err
219 }
220 t.Id = formatId(ts, seq)
221 return nil
222 }
223
224 // See docs for DB interface.
225 func (d *localDB) AssignId(t *db.Task) error {
226 oldId := t.Id
227 err := d.update("AssignId", func(tx *bolt.Tx) error {
228 return d.assignId(tx, t, time.Now())
229 })
230 if err != nil {
231 t.Id = oldId
232 }
233 return err
234 }
235
236 // See docs for DB interface.
237 func (d *localDB) GetTaskById(id string) (*db.Task, error) {
238 var rv *db.Task
239 if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
240 serialized := tasksBucket(tx).Get([]byte(id))
241 if serialized == nil {
242 return nil
243 }
244 var t db.Task
245 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t) ; err != nil {
246 return err
247 }
248 rv = &t
249 return nil
250 }); err != nil {
251 return nil, err
252 }
253 if rv == nil {
254 // Return an error if id is invalid.
255 if _, _, err := parseId(id); err != nil {
256 return nil, err
257 }
258 }
259 return rv, nil
260 }
261
262 // See docs for DB interface.
263 // TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id.
264 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error ) {
265 min := []byte(start.UTC().Format(TIMESTAMP_FORMAT))
266 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
267 decoder := db.TaskDecoder{}
268 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
269 c := tasksBucket(tx).Cursor()
270 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
271 cpy := make([]byte, len(v))
272 copy(cpy, v)
273 if !decoder.Process(cpy) {
274 return nil
275 }
276 }
277 return nil
278 }); err != nil {
279 return nil, err
280 }
281 return decoder.Result()
282 }
283
284 // See documentation for DB interface.
285 func (d *localDB) PutTask(t *db.Task) error {
286 return d.PutTasks([]*db.Task{t})
287 }
288
289 // validate returns an error if the task can not be inserted into the DB. Does
290 // not modify t.
291 func (d *localDB) validate(t *db.Task) error {
292 // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Cre ated.
293 return nil
294 }
295
296 // See documentation for DB interface.
297 // TODO(benjaminwagner): Figure out how to detect write/write conflicts and
298 // return "concurrent modification" error.
299 func (d *localDB) PutTasks(tasks []*db.Task) error {
300 // If there is an error during the transaction, we should leave the task s
301 // unchanged. Save the old Ids since we set them below.
302 oldIds := make([]string, len(tasks))
303 // Validate and save current Ids.
304 for _, t := range tasks {
305 if err := d.validate(t); err != nil {
306 return err
307 }
308 oldIds = append(oldIds, t.Id)
309 }
310 revertChanges := func() {
311 for i, oldId := range oldIds {
312 tasks[i].Id = oldId
313 }
314 }
315 err := d.update("PutTasks", func(tx *bolt.Tx) error {
316 // Assign Ids and encode.
317 e := db.TaskEncoder{}
318 now := time.Now()
319 for _, t := range tasks {
320 if t.Id == "" {
321 if err := d.assignId(tx, t, now); err != nil {
322 return err
323 }
324 }
325 e.Process(t)
326 }
327 // Insert/update.
328 for {
329 t, serialized, err := e.Next()
330 if err != nil {
331 return err
332 }
333 if t == nil {
334 break
335 }
336 if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil {
337 return err
338 }
339 }
340 return nil
341 })
342 if err != nil {
343 revertChanges()
344 return err
345 } else {
346 // TODO(benjaminwagner): pass serialized bytes.
347 d.modTasks.TrackModifiedTasks(tasks)
348 }
349 return nil
350 }
351
352 // See docs for DB interface.
353 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) {
354 return d.modTasks.GetModifiedTasks(id)
355 }
356
357 // See docs for DB interface.
358 func (d *localDB) StartTrackingModifiedTasks() (string, error) {
359 return d.modTasks.StartTrackingModifiedTasks()
360 }
361
362 // Returns the total number of tasks in the DB.
363 // TODO(benjaminwagner): add a metrics goroutine.
364 func (d *localDB) NumTasks() (int, error) {
365 var n int
366 if err := d.view("NumTasks", func(tx *bolt.Tx) error {
367 n = tasksBucket(tx).Stats().KeyN
368 return nil
369 }); err != nil {
370 return -1, err
371 }
372 return n, nil
373 }
374
375 // RunBackupServer runs an HTTP server which provides downloadable database
376 // backups.
377 func (d *localDB) RunBackupServer(port string) error {
378 r := mux.NewRouter()
379 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) {
380 if err := d.view("Backup", func(tx *bolt.Tx) error {
381 w.Header().Set("Content-Type", "application/octet-stream ")
382 w.Header().Set("Content-Disposition", "attachment; filen ame=\"tasks.db\"")
383 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz e())))
384 _, err := tx.WriteTo(w)
385 return err
386 }); err != nil {
387 glog.Errorf("Failed to create DB backup: %s", err)
388 httputils.ReportError(w, r, err, "Failed to create DB ba ckup")
389 }
390 })
391 http.Handle("/", httputils.LoggingGzipRequestResponse(r))
392 return http.ListenAndServe(port, nil)
393 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/db_test.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698