Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(468)

Side by Side Diff: build_scheduler/go/db/local_db/local_db.go

Issue 2246933002: Add Task DB implementation using a local BoltDB. (Closed) Base URL: https://skia.googlesource.com/buildbot@taskdb-impl-track
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « build_scheduler/go/db/db_test.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package local_db
2
3 import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "net/http"
8 "strconv"
9 "strings"
10 "sync"
11 "time"
12
13 "github.com/boltdb/bolt"
14 "github.com/gorilla/mux"
15 "github.com/skia-dev/glog"
16 "go.skia.org/infra/build_scheduler/go/db"
17 "go.skia.org/infra/go/httputils"
18 "go.skia.org/infra/go/metrics2"
19 "go.skia.org/infra/go/util"
20 )
21
22 const (
23 // Tasks. Key is Task.Id, which is set to (creation time, sequence numbe r)
24 // (see formatId for detail), value is the GOB of the task. Tasks will b e
25 // updated in place. All repos share the same bucket.
26 // TODO(benjaminwagner): May need to prefix value with metadata.
27 BUCKET_TASKS = "tasks"
28 // BUCKET_TASKS will be append-mostly, so use a high fill percent.
29 BUCKET_TASKS_FILL_PERCENT = 0.9
30
31 // Similar to util.RFC3339NanoZeroPad, but since Task.Id can not contain
32 // colons, we omit most of the punctuation. This timestamp can only be u sed to
33 // format and parse times in UTC.
34 TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
35 )
36
37 // formatId returns the timestamp and sequence number formatted for a Task ID.
38 // Format is "<encoded_timestamp>_<encoded_sequence_num>", where:
39 // - encoded_timestamp is the timestamp formatted using TIMESTAMP_FORMAT
40 // - encoded_sequence_num is
41 // "<lexical_ordering_char><hexidecimal_sequence_number>"
42 // - lexical_ordering_char is len(hexidecimal_sequence_number)-1 encoded as a
43 // hexidecimal digit, to allow lexical ordering of IDs.
44 // - hexidecimal_sequence_number is the sequence number formatted in
45 // hexidecimal digits with no padding.
46 // (Note that since the sequence number is uint64, the maximum possible value of
47 // len(hexidecimal_sequence_number)-1 is 15, so it can always be encoded as a
48 // single hexidecimal digit.)
borenet 2016/08/16 19:49:54 Why not just have this function zero-pad the seque
dogben 2016/08/17 13:29:44 You're right. I thought it was overkill to have th
49 func formatId(t time.Time, seq uint64) string {
50 t = t.UTC()
51 h := fmt.Sprintf("%x", seq)
52 return fmt.Sprintf("%s_%x%s", t.Format(TIMESTAMP_FORMAT), len(h)-1, h)
53 }
54
55 // parseId returns the timestamp and sequence number stored in a Task ID.
56 func parseId(id string) (time.Time, uint64, error) {
57 parts := strings.Split(id, "_")
58 if len(parts) != 2 {
59 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
60 }
61 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
62 if err != nil {
63 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr)
64 }
65 var seq uint64
66 // Skip the first character (lexicographic ordering character).
67 if len(parts[1]) < 2 {
68 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected a t least 2 hex digits in second part.", id)
69 }
70 i, err := fmt.Sscanf(parts[1][1:], "%x", &seq)
71 if err != nil {
72 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr)
73 } else if i != 1 {
74 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o ne hex number in %s, got %d", id, parts[1][1:], i)
75 }
76 return t, seq, nil
77 }
78
79 // localDB accesses a local BoltDB database containing tasks.
80 type localDB struct {
81 // name is used in logging and metrics to identify this DB.
82 name string
83
84 // db is the underlying BoltDB.
85 db *bolt.DB
86
87 // tx fields contain metrics on the number of active transactions. Prote cted
88 // by txMutex.
89 txCount *metrics2.Counter
90 txNextId int64
91 txActive map[int64]string
92 txMutex sync.RWMutex
93
94 modTasks db.ModifiedTasks
95 }
96
97 // startTx monitors when a transaction starts.
98 func (d *localDB) startTx(name string) int64 {
99 d.txMutex.Lock()
100 defer d.txMutex.Unlock()
101 d.txCount.Inc(1)
102 id := d.txNextId
103 d.txActive[id] = name
104 d.txNextId++
105 return id
106 }
107
108 // endTx monitors when a transaction ends.
109 func (d *localDB) endTx(id int64) {
110 d.txMutex.Lock()
111 defer d.txMutex.Unlock()
112 d.txCount.Dec(1)
113 delete(d.txActive, id)
114 }
115
116 // reportActiveTx prints out the list of active transactions.
117 func (d *localDB) reportActiveTx() {
118 d.txMutex.RLock()
119 defer d.txMutex.RUnlock()
120 if len(d.txActive) == 0 {
121 glog.Infof("%s Active Transactions: (none)", d.name)
122 return
123 }
124 txs := make([]string, 0, len(d.txActive))
125 for id, name := range d.txActive {
126 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
127 }
128 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n" ))
129 }
130
131 // tx is a wrapper for a BoltDB transaction which tracks statistics.
132 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
133 txId := d.startTx(name)
134 defer d.endTx(txId)
135 defer metrics2.NewTimer("db-tx-duration", map[string]string{
136 "database": d.name,
137 "transaction": name,
138 }).Stop()
139 if update {
140 return d.db.Update(fn)
141 } else {
142 return d.db.View(fn)
143 }
144 }
145
146 // view is a wrapper for the BoltDB instance's View method.
147 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
148 return d.tx(name, fn, false)
149 }
150
151 // update is a wrapper for the BoltDB instance's Update method.
152 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
153 return d.tx(name, fn, true)
154 }
155
156 // Returns the tasks bucket with FillPercent set.
157 func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
158 b := tx.Bucket([]byte(BUCKET_TASKS))
159 b.FillPercent = BUCKET_TASKS_FILL_PERCENT
160 return b
161 }
162
163 // NewDB returns a local DB instance.
164 func NewDB(name, filename string) (db.DB, error) {
165 boltdb, err := bolt.Open(filename, 0600, nil)
166 if err != nil {
167 return nil, err
168 }
169 d := &localDB{
170 name: name,
171 db: boltdb,
172 txCount: metrics2.GetCounter("db-active-tx", map[string]string{
173 "database": name,
174 }),
175 txNextId: 0,
176 txActive: map[int64]string{},
177 }
178 go func() {
179 for _ = range time.Tick(time.Minute) {
180 d.reportActiveTx()
181 }
182 }()
183
184 if err := d.update("NewDB", func(tx *bolt.Tx) error {
185 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e rr != nil {
186 return err
187 }
188 return nil
189 }); err != nil {
190 return nil, err
191 }
192
193 return d, nil
194 }
195
196 // See docs for DB interface.
197 func (d *localDB) Close() error {
198 d.txMutex.Lock()
199 defer d.txMutex.Unlock()
200 d.txActive = map[int64]string{}
borenet 2016/08/16 19:49:54 nit: Should we also set the counter to zero here?
dogben 2016/08/17 13:29:44 I added a check that there are no active transacti
201
202 return d.db.Close()
203 }
204
205 // Sets t.Id either based on t.Created or now. tx must be an update transaction.
206 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error {
207 if t.Id != "" {
208 return fmt.Errorf("Task Id already assigned: %v", t.Id)
209 }
210 ts := now
211 if !util.TimeIsZero(t.Created) {
212 ts = t.Created
213 }
214 seq, err := tasksBucket(tx).NextSequence()
215 if err != nil {
216 return err
217 }
218 t.Id = formatId(ts, seq)
219 return nil
220 }
221
222 // See docs for DB interface.
223 func (d *localDB) AssignId(t *db.Task) error {
224 oldId := t.Id
225 err := d.update("AssignId", func(tx *bolt.Tx) error {
226 return d.assignId(tx, t, time.Now())
227 })
228 if err != nil {
229 t.Id = oldId
230 }
231 return err
232 }
233
234 // See docs for DB interface.
235 func (d *localDB) GetTaskById(id string) (*db.Task, error) {
236 var rv *db.Task
237 if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
238 serialized := tasksBucket(tx).Get([]byte(id))
239 if serialized == nil {
240 return nil
241 }
242 var t db.Task
243 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t) ; err != nil {
244 return err
245 }
246 rv = &t
247 return nil
248 }); err != nil {
249 return nil, err
250 }
251 if rv == nil {
252 // Return an error if id is invalid.
253 if _, _, err := parseId(id); err != nil {
254 return nil, err
255 }
256 }
257 return rv, nil
258 }
259
260 // See docs for DB interface.
261 // TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id.
262 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error ) {
263 min := []byte(start.UTC().Format(TIMESTAMP_FORMAT))
264 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
265 decoder := db.TaskDecoder{}
266 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
267 c := tasksBucket(tx).Cursor()
268 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
269 cpy := make([]byte, len(v))
270 copy(cpy, v)
271 if !decoder.Process(cpy) {
272 return nil
273 }
274 }
275 return nil
276 }); err != nil {
277 return nil, err
278 }
279 return decoder.Result()
280 }
281
282 // See documentation for DB interface.
283 func (d *localDB) PutTask(t *db.Task) error {
284 return d.PutTasks([]*db.Task{t})
285 }
286
287 // validate returns an error if the task can not be inserted into the DB. Does
288 // not modify t.
289 func (d *localDB) validate(t *db.Task) error {
290 // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Cre ated.
291 return nil
292 }
293
294 // See documentation for DB interface.
295 // TODO(benjaminwagner): Figure out how to detect write/write conflicts and
296 // return "concurrent modification" error.
297 func (d *localDB) PutTasks(tasks []*db.Task) error {
298 // If there is an error during the transaction, we should leave the task s
299 // unchanged. Save the old Ids since we set them below.
300 oldIds := make([]string, len(tasks))
301 // Validate and save current Ids.
302 for _, t := range tasks {
303 if err := d.validate(t); err != nil {
304 return err
305 }
306 oldIds = append(oldIds, t.Id)
307 }
308 revertChanges := func() {
309 for i, oldId := range oldIds {
310 tasks[i].Id = oldId
311 }
312 }
313 err := d.update("PutTasks", func(tx *bolt.Tx) error {
314 // Assign Ids and encode.
315 e := db.TaskEncoder{}
316 now := time.Now()
317 for _, t := range tasks {
318 if t.Id == "" {
319 if err := d.assignId(tx, t, now); err != nil {
320 return err
321 }
322 }
323 e.Process(t)
324 }
325 // Insert/update.
326 for {
327 t, serialized, err := e.Next()
328 if err != nil {
329 return err
330 }
331 if t == nil {
332 break
333 }
334 if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil {
335 return err
336 }
337 }
338 return nil
339 })
340 if err != nil {
341 revertChanges()
342 return err
343 } else {
344 // TODO(benjaminwagner): pass serialized bytes.
345 d.modTasks.TrackModifiedTasks(tasks)
346 }
347 return nil
348 }
349
350 // See docs for DB interface.
351 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) {
352 return d.modTasks.GetModifiedTasks(id)
353 }
354
355 // See docs for DB interface.
356 func (d *localDB) StartTrackingModifiedTasks() (string, error) {
357 return d.modTasks.StartTrackingModifiedTasks()
358 }
359
360 // Returns the total number of tasks in the DB.
361 // TODO(benjaminwagner): add a metrics goroutine.
362 func (d *localDB) NumTasks() (int, error) {
363 var n int
364 if err := d.view("NumTasks", func(tx *bolt.Tx) error {
365 n = tasksBucket(tx).Stats().KeyN
366 return nil
367 }); err != nil {
368 return -1, err
369 }
370 return n, nil
371 }
372
373 // RunBackupServer runs an HTTP server which provides downloadable database
374 // backups.
375 func (d *localDB) RunBackupServer(port string) error {
376 r := mux.NewRouter()
377 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) {
378 if err := d.view("Backup", func(tx *bolt.Tx) error {
379 w.Header().Set("Content-Type", "application/octet-stream ")
380 w.Header().Set("Content-Disposition", "attachment; filen ame=\"tasks.db\"")
381 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz e())))
382 _, err := tx.WriteTo(w)
383 return err
384 }); err != nil {
385 glog.Errorf("Failed to create DB backup: %s", err)
386 httputils.ReportError(w, r, err, "Failed to create DB ba ckup")
387 }
388 })
389 http.Handle("/", httputils.LoggingGzipRequestResponse(r))
390 return http.ListenAndServe(port, nil)
391 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/db_test.go ('k') | build_scheduler/go/db/local_db/local_db_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698