OLD | NEW |
---|---|
(Empty) | |
1 package local_db | |
2 | |
3 import ( | |
4 "bytes" | |
5 "encoding/gob" | |
6 "fmt" | |
7 "net/http" | |
8 "strconv" | |
9 "strings" | |
10 "sync" | |
11 "time" | |
12 | |
13 "github.com/boltdb/bolt" | |
14 "github.com/gorilla/mux" | |
15 "github.com/skia-dev/glog" | |
16 "go.skia.org/infra/build_scheduler/go/db" | |
17 "go.skia.org/infra/go/httputils" | |
18 "go.skia.org/infra/go/metrics2" | |
19 "go.skia.org/infra/go/util" | |
20 ) | |
21 | |
22 const ( | |
23 // Tasks. Key is Task.Id, which is set to (creation time, sequence numbe r) | |
24 // (see formatId for detail), value is the GOB of the task. Tasks will b e | |
25 // updated in place. All repos share the same bucket. | |
26 // TODO(benjaminwagner): May need to prefix value with metadata. | |
27 BUCKET_TASKS = "tasks" | |
28 // BUCKET_TASKS will be append-mostly, so use a high fill percent. | |
29 BUCKET_TASKS_FILL_PERCENT = 0.9 | |
30 | |
31 // Similar to util.RFC3339NanoZeroPad, but since Task.Id can not contain | |
32 // colons, we omit most of the punctuation. This timestamp can only be u sed to | |
33 // format and parse times in UTC. | |
34 TIMESTAMP_FORMAT = "20060102T150405.000000000Z" | |
35 ) | |
36 | |
37 // formatId returns the timestamp and sequence number formatted for a Task ID. | |
38 // Format is "<encoded_timestamp>_<encoded_sequence_num>", where: | |
39 // - encoded_timestamp is the timestamp formatted using TIMESTAMP_FORMAT | |
40 // - encoded_sequence_num is | |
41 // "<lexical_ordering_char><hexidecimal_sequence_number>" | |
42 // - lexical_ordering_char is len(hexidecimal_sequence_number)-1 encoded as a | |
43 // hexidecimal digit, to allow lexical ordering of IDs. | |
44 // - hexidecimal_sequence_number is the sequence number formatted in | |
45 // hexidecimal digits with no padding. | |
46 // (Note that since the sequence number is uint64, the maximum possible value of | |
47 // len(hexidecimal_sequence_number)-1 is 15, so it can always be encoded as a | |
48 // single hexidecimal digit.) | |
borenet
2016/08/16 19:49:54
Why not just have this function zero-pad the seque
dogben
2016/08/17 13:29:44
You're right. I thought it was overkill to have th
| |
49 func formatId(t time.Time, seq uint64) string { | |
50 t = t.UTC() | |
51 h := fmt.Sprintf("%x", seq) | |
52 return fmt.Sprintf("%s_%x%s", t.Format(TIMESTAMP_FORMAT), len(h)-1, h) | |
53 } | |
54 | |
55 // parseId returns the timestamp and sequence number stored in a Task ID. | |
56 func parseId(id string) (time.Time, uint64, error) { | |
57 parts := strings.Split(id, "_") | |
58 if len(parts) != 2 { | |
59 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id) | |
60 } | |
61 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0]) | |
62 if err != nil { | |
63 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr) | |
64 } | |
65 var seq uint64 | |
66 // Skip the first character (lexicographic ordering character). | |
67 if len(parts[1]) < 2 { | |
68 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected a t least 2 hex digits in second part.", id) | |
69 } | |
70 i, err := fmt.Sscanf(parts[1][1:], "%x", &seq) | |
71 if err != nil { | |
72 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr) | |
73 } else if i != 1 { | |
74 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o ne hex number in %s, got %d", id, parts[1][1:], i) | |
75 } | |
76 return t, seq, nil | |
77 } | |
78 | |
79 // localDB accesses a local BoltDB database containing tasks. | |
80 type localDB struct { | |
81 // name is used in logging and metrics to identify this DB. | |
82 name string | |
83 | |
84 // db is the underlying BoltDB. | |
85 db *bolt.DB | |
86 | |
87 // tx fields contain metrics on the number of active transactions. Prote cted | |
88 // by txMutex. | |
89 txCount *metrics2.Counter | |
90 txNextId int64 | |
91 txActive map[int64]string | |
92 txMutex sync.RWMutex | |
93 | |
94 modTasks db.ModifiedTasks | |
95 } | |
96 | |
97 // startTx monitors when a transaction starts. | |
98 func (d *localDB) startTx(name string) int64 { | |
99 d.txMutex.Lock() | |
100 defer d.txMutex.Unlock() | |
101 d.txCount.Inc(1) | |
102 id := d.txNextId | |
103 d.txActive[id] = name | |
104 d.txNextId++ | |
105 return id | |
106 } | |
107 | |
108 // endTx monitors when a transaction ends. | |
109 func (d *localDB) endTx(id int64) { | |
110 d.txMutex.Lock() | |
111 defer d.txMutex.Unlock() | |
112 d.txCount.Dec(1) | |
113 delete(d.txActive, id) | |
114 } | |
115 | |
116 // reportActiveTx prints out the list of active transactions. | |
117 func (d *localDB) reportActiveTx() { | |
118 d.txMutex.RLock() | |
119 defer d.txMutex.RUnlock() | |
120 if len(d.txActive) == 0 { | |
121 glog.Infof("%s Active Transactions: (none)", d.name) | |
122 return | |
123 } | |
124 txs := make([]string, 0, len(d.txActive)) | |
125 for id, name := range d.txActive { | |
126 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name)) | |
127 } | |
128 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n" )) | |
129 } | |
130 | |
131 // tx is a wrapper for a BoltDB transaction which tracks statistics. | |
132 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error { | |
133 txId := d.startTx(name) | |
134 defer d.endTx(txId) | |
135 defer metrics2.NewTimer("db-tx-duration", map[string]string{ | |
136 "database": d.name, | |
137 "transaction": name, | |
138 }).Stop() | |
139 if update { | |
140 return d.db.Update(fn) | |
141 } else { | |
142 return d.db.View(fn) | |
143 } | |
144 } | |
145 | |
146 // view is a wrapper for the BoltDB instance's View method. | |
147 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error { | |
148 return d.tx(name, fn, false) | |
149 } | |
150 | |
151 // update is a wrapper for the BoltDB instance's Update method. | |
152 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error { | |
153 return d.tx(name, fn, true) | |
154 } | |
155 | |
156 // Returns the tasks bucket with FillPercent set. | |
157 func tasksBucket(tx *bolt.Tx) *bolt.Bucket { | |
158 b := tx.Bucket([]byte(BUCKET_TASKS)) | |
159 b.FillPercent = BUCKET_TASKS_FILL_PERCENT | |
160 return b | |
161 } | |
162 | |
163 // NewDB returns a local DB instance. | |
164 func NewDB(name, filename string) (db.DB, error) { | |
165 boltdb, err := bolt.Open(filename, 0600, nil) | |
166 if err != nil { | |
167 return nil, err | |
168 } | |
169 d := &localDB{ | |
170 name: name, | |
171 db: boltdb, | |
172 txCount: metrics2.GetCounter("db-active-tx", map[string]string{ | |
173 "database": name, | |
174 }), | |
175 txNextId: 0, | |
176 txActive: map[int64]string{}, | |
177 } | |
178 go func() { | |
179 for _ = range time.Tick(time.Minute) { | |
180 d.reportActiveTx() | |
181 } | |
182 }() | |
183 | |
184 if err := d.update("NewDB", func(tx *bolt.Tx) error { | |
185 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e rr != nil { | |
186 return err | |
187 } | |
188 return nil | |
189 }); err != nil { | |
190 return nil, err | |
191 } | |
192 | |
193 return d, nil | |
194 } | |
195 | |
196 // See docs for DB interface. | |
197 func (d *localDB) Close() error { | |
198 d.txMutex.Lock() | |
199 defer d.txMutex.Unlock() | |
200 d.txActive = map[int64]string{} | |
borenet
2016/08/16 19:49:54
nit: Should we also set the counter to zero here?
dogben
2016/08/17 13:29:44
I added a check that there are no active transacti
| |
201 | |
202 return d.db.Close() | |
203 } | |
204 | |
205 // Sets t.Id either based on t.Created or now. tx must be an update transaction. | |
206 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error { | |
207 if t.Id != "" { | |
208 return fmt.Errorf("Task Id already assigned: %v", t.Id) | |
209 } | |
210 ts := now | |
211 if !util.TimeIsZero(t.Created) { | |
212 ts = t.Created | |
213 } | |
214 seq, err := tasksBucket(tx).NextSequence() | |
215 if err != nil { | |
216 return err | |
217 } | |
218 t.Id = formatId(ts, seq) | |
219 return nil | |
220 } | |
221 | |
222 // See docs for DB interface. | |
223 func (d *localDB) AssignId(t *db.Task) error { | |
224 oldId := t.Id | |
225 err := d.update("AssignId", func(tx *bolt.Tx) error { | |
226 return d.assignId(tx, t, time.Now()) | |
227 }) | |
228 if err != nil { | |
229 t.Id = oldId | |
230 } | |
231 return err | |
232 } | |
233 | |
234 // See docs for DB interface. | |
235 func (d *localDB) GetTaskById(id string) (*db.Task, error) { | |
236 var rv *db.Task | |
237 if err := d.view("GetTaskById", func(tx *bolt.Tx) error { | |
238 serialized := tasksBucket(tx).Get([]byte(id)) | |
239 if serialized == nil { | |
240 return nil | |
241 } | |
242 var t db.Task | |
243 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t) ; err != nil { | |
244 return err | |
245 } | |
246 rv = &t | |
247 return nil | |
248 }); err != nil { | |
249 return nil, err | |
250 } | |
251 if rv == nil { | |
252 // Return an error if id is invalid. | |
253 if _, _, err := parseId(id); err != nil { | |
254 return nil, err | |
255 } | |
256 } | |
257 return rv, nil | |
258 } | |
259 | |
260 // See docs for DB interface. | |
261 // TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id. | |
262 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error ) { | |
263 min := []byte(start.UTC().Format(TIMESTAMP_FORMAT)) | |
264 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT)) | |
265 decoder := db.TaskDecoder{} | |
266 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error { | |
267 c := tasksBucket(tx).Cursor() | |
268 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { | |
269 cpy := make([]byte, len(v)) | |
270 copy(cpy, v) | |
271 if !decoder.Process(cpy) { | |
272 return nil | |
273 } | |
274 } | |
275 return nil | |
276 }); err != nil { | |
277 return nil, err | |
278 } | |
279 return decoder.Result() | |
280 } | |
281 | |
282 // See documentation for DB interface. | |
283 func (d *localDB) PutTask(t *db.Task) error { | |
284 return d.PutTasks([]*db.Task{t}) | |
285 } | |
286 | |
287 // validate returns an error if the task can not be inserted into the DB. Does | |
288 // not modify t. | |
289 func (d *localDB) validate(t *db.Task) error { | |
290 // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Cre ated. | |
291 return nil | |
292 } | |
293 | |
294 // See documentation for DB interface. | |
295 // TODO(benjaminwagner): Figure out how to detect write/write conflicts and | |
296 // return "concurrent modification" error. | |
297 func (d *localDB) PutTasks(tasks []*db.Task) error { | |
298 // If there is an error during the transaction, we should leave the task s | |
299 // unchanged. Save the old Ids since we set them below. | |
300 oldIds := make([]string, len(tasks)) | |
301 // Validate and save current Ids. | |
302 for _, t := range tasks { | |
303 if err := d.validate(t); err != nil { | |
304 return err | |
305 } | |
306 oldIds = append(oldIds, t.Id) | |
307 } | |
308 revertChanges := func() { | |
309 for i, oldId := range oldIds { | |
310 tasks[i].Id = oldId | |
311 } | |
312 } | |
313 err := d.update("PutTasks", func(tx *bolt.Tx) error { | |
314 // Assign Ids and encode. | |
315 e := db.TaskEncoder{} | |
316 now := time.Now() | |
317 for _, t := range tasks { | |
318 if t.Id == "" { | |
319 if err := d.assignId(tx, t, now); err != nil { | |
320 return err | |
321 } | |
322 } | |
323 e.Process(t) | |
324 } | |
325 // Insert/update. | |
326 for { | |
327 t, serialized, err := e.Next() | |
328 if err != nil { | |
329 return err | |
330 } | |
331 if t == nil { | |
332 break | |
333 } | |
334 if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil { | |
335 return err | |
336 } | |
337 } | |
338 return nil | |
339 }) | |
340 if err != nil { | |
341 revertChanges() | |
342 return err | |
343 } else { | |
344 // TODO(benjaminwagner): pass serialized bytes. | |
345 d.modTasks.TrackModifiedTasks(tasks) | |
346 } | |
347 return nil | |
348 } | |
349 | |
350 // See docs for DB interface. | |
351 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) { | |
352 return d.modTasks.GetModifiedTasks(id) | |
353 } | |
354 | |
355 // See docs for DB interface. | |
356 func (d *localDB) StartTrackingModifiedTasks() (string, error) { | |
357 return d.modTasks.StartTrackingModifiedTasks() | |
358 } | |
359 | |
360 // Returns the total number of tasks in the DB. | |
361 // TODO(benjaminwagner): add a metrics goroutine. | |
362 func (d *localDB) NumTasks() (int, error) { | |
363 var n int | |
364 if err := d.view("NumTasks", func(tx *bolt.Tx) error { | |
365 n = tasksBucket(tx).Stats().KeyN | |
366 return nil | |
367 }); err != nil { | |
368 return -1, err | |
369 } | |
370 return n, nil | |
371 } | |
372 | |
373 // RunBackupServer runs an HTTP server which provides downloadable database | |
374 // backups. | |
375 func (d *localDB) RunBackupServer(port string) error { | |
376 r := mux.NewRouter() | |
377 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) { | |
378 if err := d.view("Backup", func(tx *bolt.Tx) error { | |
379 w.Header().Set("Content-Type", "application/octet-stream ") | |
380 w.Header().Set("Content-Disposition", "attachment; filen ame=\"tasks.db\"") | |
381 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz e()))) | |
382 _, err := tx.WriteTo(w) | |
383 return err | |
384 }); err != nil { | |
385 glog.Errorf("Failed to create DB backup: %s", err) | |
386 httputils.ReportError(w, r, err, "Failed to create DB ba ckup") | |
387 } | |
388 }) | |
389 http.Handle("/", httputils.LoggingGzipRequestResponse(r)) | |
390 return http.ListenAndServe(port, nil) | |
391 } | |
OLD | NEW |