OLD | NEW |
---|---|
(Empty) | |
1 package local_db | |
2 | |
3 import ( | |
4 "bytes" | |
5 "encoding/gob" | |
6 "fmt" | |
7 "net/http" | |
8 "strconv" | |
9 "strings" | |
10 "sync" | |
11 "time" | |
12 | |
13 "github.com/boltdb/bolt" | |
14 "github.com/gorilla/mux" | |
15 "github.com/skia-dev/glog" | |
16 "go.skia.org/infra/build_scheduler/go/db" | |
17 "go.skia.org/infra/go/httputils" | |
18 "go.skia.org/infra/go/metrics2" | |
19 "go.skia.org/infra/go/util" | |
20 ) | |
21 | |
22 const ( | |
23 // Tasks. Key is Task.Id, which is set to (creation time, sequence numbe r) | |
24 // (see formatId for detail), value is the GOB of the task. Tasks will b e | |
25 // updated in place. All repos share the same bucket. | |
26 // TODO(benjaminwagner): May need to prefix value with metadata. | |
27 BUCKET_TASKS = "tasks" | |
28 // BUCKET_TASKS will be append-mostly, so use a high fill percent. | |
29 BUCKET_TASKS_FILL_PERCENT = 0.9 | |
30 | |
31 // TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Pa rse to | |
32 // format/parse the timestamp in the Task ID. It is similar to | |
33 // util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we omit | |
34 // most of the punctuation. This timestamp can only be used to format an d | |
35 // parse times in UTC. | |
36 TIMESTAMP_FORMAT = "20060102T150405.000000000Z" | |
37 // SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or | |
38 // fmt.Sscanf to format/parse the sequence number in the Task ID. It is a | |
39 // 16-digit zero-padded lowercase hexidecimal number. | |
40 SEQUENCE_NUMBER_FORMAT = "%016x" | |
41 ) | |
42 | |
43 // formatId returns the timestamp and sequence number formatted for a Task ID. | |
44 // Format is "<timestamp>_<sequence_num>", where the timestamp is formatted | |
45 // using TIMESTAMP_FORMAT and sequence_num is formatted using | |
46 // SEQUENCE_NUMBER_FORMAT. | |
47 func formatId(t time.Time, seq uint64) string { | |
48 t = t.UTC() | |
49 return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORM AT), seq) | |
50 } | |
51 | |
52 // parseId returns the timestamp and sequence number stored in a Task ID. | |
53 func parseId(id string) (time.Time, uint64, error) { | |
54 parts := strings.Split(id, "_") | |
55 if len(parts) != 2 { | |
56 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id) | |
57 } | |
58 t, err := time.Parse(TIMESTAMP_FORMAT, parts[0]) | |
59 if err != nil { | |
60 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr) | |
61 } | |
62 var seq uint64 | |
63 // Add newlines to force Sscanf to match the entire string. Otherwise | |
64 // "123hello" will be parsed as 123. Note that Sscanf does not require 1 6 | |
65 // digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 dig its. | |
66 i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq) | |
67 if err != nil { | |
68 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, e rr) | |
69 } else if i != 1 { | |
70 return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected o ne hex number in %s, got %d", id, parts[1], i) | |
71 } | |
72 return t, seq, nil | |
73 } | |
74 | |
75 // localDB accesses a local BoltDB database containing tasks. | |
76 type localDB struct { | |
77 // name is used in logging and metrics to identify this DB. | |
78 name string | |
79 | |
80 // db is the underlying BoltDB. | |
81 db *bolt.DB | |
82 | |
83 // tx fields contain metrics on the number of active transactions. Prote cted | |
84 // by txMutex. | |
85 txCount *metrics2.Counter | |
86 txNextId int64 | |
87 txActive map[int64]string | |
88 txMutex sync.RWMutex | |
89 | |
90 modTasks db.ModifiedTasks | |
91 } | |
92 | |
93 // startTx monitors when a transaction starts. | |
94 func (d *localDB) startTx(name string) int64 { | |
95 d.txMutex.Lock() | |
96 defer d.txMutex.Unlock() | |
97 d.txCount.Inc(1) | |
98 id := d.txNextId | |
99 d.txActive[id] = name | |
100 d.txNextId++ | |
101 return id | |
102 } | |
103 | |
104 // endTx monitors when a transaction ends. | |
105 func (d *localDB) endTx(id int64) { | |
106 d.txMutex.Lock() | |
107 defer d.txMutex.Unlock() | |
108 d.txCount.Dec(1) | |
109 delete(d.txActive, id) | |
110 } | |
111 | |
112 // reportActiveTx prints out the list of active transactions. | |
113 func (d *localDB) reportActiveTx() { | |
114 d.txMutex.RLock() | |
115 defer d.txMutex.RUnlock() | |
116 if len(d.txActive) == 0 { | |
117 glog.Infof("%s Active Transactions: (none)", d.name) | |
118 return | |
119 } | |
120 txs := make([]string, 0, len(d.txActive)) | |
121 for id, name := range d.txActive { | |
122 txs = append(txs, fmt.Sprintf(" %d\t%s", id, name)) | |
123 } | |
124 glog.Infof("%s Active Transactions:\n%s", d.name, strings.Join(txs, "\n" )) | |
125 } | |
126 | |
127 // tx is a wrapper for a BoltDB transaction which tracks statistics. | |
128 func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error { | |
129 txId := d.startTx(name) | |
130 defer d.endTx(txId) | |
131 defer metrics2.NewTimer("db-tx-duration", map[string]string{ | |
132 "database": d.name, | |
133 "transaction": name, | |
134 }).Stop() | |
135 if update { | |
136 return d.db.Update(fn) | |
137 } else { | |
138 return d.db.View(fn) | |
139 } | |
140 } | |
141 | |
142 // view is a wrapper for the BoltDB instance's View method. | |
143 func (d *localDB) view(name string, fn func(*bolt.Tx) error) error { | |
144 return d.tx(name, fn, false) | |
145 } | |
146 | |
147 // update is a wrapper for the BoltDB instance's Update method. | |
148 func (d *localDB) update(name string, fn func(*bolt.Tx) error) error { | |
149 return d.tx(name, fn, true) | |
150 } | |
151 | |
152 // Returns the tasks bucket with FillPercent set. | |
153 func tasksBucket(tx *bolt.Tx) *bolt.Bucket { | |
154 b := tx.Bucket([]byte(BUCKET_TASKS)) | |
155 b.FillPercent = BUCKET_TASKS_FILL_PERCENT | |
156 return b | |
157 } | |
158 | |
159 // NewDB returns a local DB instance. | |
160 func NewDB(name, filename string) (db.DB, error) { | |
161 boltdb, err := bolt.Open(filename, 0600, nil) | |
162 if err != nil { | |
163 return nil, err | |
164 } | |
165 d := &localDB{ | |
166 name: name, | |
167 db: boltdb, | |
168 txCount: metrics2.GetCounter("db-active-tx", map[string]string{ | |
169 "database": name, | |
170 }), | |
171 txNextId: 0, | |
172 txActive: map[int64]string{}, | |
173 } | |
174 go func() { | |
175 for _ = range time.Tick(time.Minute) { | |
176 d.reportActiveTx() | |
177 } | |
178 }() | |
179 | |
180 if err := d.update("NewDB", func(tx *bolt.Tx) error { | |
181 if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); e rr != nil { | |
182 return err | |
183 } | |
184 return nil | |
185 }); err != nil { | |
186 return nil, err | |
187 } | |
188 | |
189 return d, nil | |
190 } | |
191 | |
192 // See docs for DB interface. | |
193 func (d *localDB) Close() error { | |
194 d.txMutex.Lock() | |
195 defer d.txMutex.Unlock() | |
196 if len(d.txActive) > 0 { | |
197 return fmt.Errorf("Can not close DB when transactions are active .") | |
198 } | |
199 // TODO(benjaminwagner): Make this work. | |
borenet
2016/08/17 13:51:13
What's not working?
dogben
2016/08/17 13:57:58
Actually, it would be great if you can take a look
| |
200 //if err := d.txCount.Delete(); err != nil { | |
201 // return err | |
202 //} | |
203 d.txActive = map[int64]string{} | |
204 return d.db.Close() | |
205 } | |
206 | |
207 // Sets t.Id either based on t.Created or now. tx must be an update transaction. | |
208 func (d *localDB) assignId(tx *bolt.Tx, t *db.Task, now time.Time) error { | |
209 if t.Id != "" { | |
210 return fmt.Errorf("Task Id already assigned: %v", t.Id) | |
211 } | |
212 ts := now | |
213 if !util.TimeIsZero(t.Created) { | |
214 ts = t.Created | |
215 } | |
216 seq, err := tasksBucket(tx).NextSequence() | |
217 if err != nil { | |
218 return err | |
219 } | |
220 t.Id = formatId(ts, seq) | |
221 return nil | |
222 } | |
223 | |
224 // See docs for DB interface. | |
225 func (d *localDB) AssignId(t *db.Task) error { | |
226 oldId := t.Id | |
227 err := d.update("AssignId", func(tx *bolt.Tx) error { | |
228 return d.assignId(tx, t, time.Now()) | |
229 }) | |
230 if err != nil { | |
231 t.Id = oldId | |
232 } | |
233 return err | |
234 } | |
235 | |
236 // See docs for DB interface. | |
237 func (d *localDB) GetTaskById(id string) (*db.Task, error) { | |
238 var rv *db.Task | |
239 if err := d.view("GetTaskById", func(tx *bolt.Tx) error { | |
240 serialized := tasksBucket(tx).Get([]byte(id)) | |
241 if serialized == nil { | |
242 return nil | |
243 } | |
244 var t db.Task | |
245 if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t) ; err != nil { | |
246 return err | |
247 } | |
248 rv = &t | |
249 return nil | |
250 }); err != nil { | |
251 return nil, err | |
252 } | |
253 if rv == nil { | |
254 // Return an error if id is invalid. | |
255 if _, _, err := parseId(id); err != nil { | |
256 return nil, err | |
257 } | |
258 } | |
259 return rv, nil | |
260 } | |
261 | |
262 // See docs for DB interface. | |
263 // TODO(benjaminwagner): Filter Tasks based on Task.Created rather than Task.Id. | |
264 func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error ) { | |
265 min := []byte(start.UTC().Format(TIMESTAMP_FORMAT)) | |
266 max := []byte(end.UTC().Format(TIMESTAMP_FORMAT)) | |
267 decoder := db.TaskDecoder{} | |
268 if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error { | |
269 c := tasksBucket(tx).Cursor() | |
270 for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { | |
271 cpy := make([]byte, len(v)) | |
272 copy(cpy, v) | |
273 if !decoder.Process(cpy) { | |
274 return nil | |
275 } | |
276 } | |
277 return nil | |
278 }); err != nil { | |
279 return nil, err | |
280 } | |
281 return decoder.Result() | |
282 } | |
283 | |
284 // See documentation for DB interface. | |
285 func (d *localDB) PutTask(t *db.Task) error { | |
286 return d.PutTasks([]*db.Task{t}) | |
287 } | |
288 | |
289 // validate returns an error if the task can not be inserted into the DB. Does | |
290 // not modify t. | |
291 func (d *localDB) validate(t *db.Task) error { | |
292 // TODO(benjaminwagner): Check skew between t.Id (if assigned) and t.Cre ated. | |
293 return nil | |
294 } | |
295 | |
296 // See documentation for DB interface. | |
297 // TODO(benjaminwagner): Figure out how to detect write/write conflicts and | |
298 // return "concurrent modification" error. | |
299 func (d *localDB) PutTasks(tasks []*db.Task) error { | |
300 // If there is an error during the transaction, we should leave the task s | |
301 // unchanged. Save the old Ids since we set them below. | |
302 oldIds := make([]string, len(tasks)) | |
303 // Validate and save current Ids. | |
304 for _, t := range tasks { | |
305 if err := d.validate(t); err != nil { | |
306 return err | |
307 } | |
308 oldIds = append(oldIds, t.Id) | |
309 } | |
310 revertChanges := func() { | |
311 for i, oldId := range oldIds { | |
312 tasks[i].Id = oldId | |
313 } | |
314 } | |
315 err := d.update("PutTasks", func(tx *bolt.Tx) error { | |
316 // Assign Ids and encode. | |
317 e := db.TaskEncoder{} | |
318 now := time.Now() | |
319 for _, t := range tasks { | |
320 if t.Id == "" { | |
321 if err := d.assignId(tx, t, now); err != nil { | |
322 return err | |
323 } | |
324 } | |
325 e.Process(t) | |
326 } | |
327 // Insert/update. | |
328 for { | |
329 t, serialized, err := e.Next() | |
330 if err != nil { | |
331 return err | |
332 } | |
333 if t == nil { | |
334 break | |
335 } | |
336 if err := tasksBucket(tx).Put([]byte(t.Id), serialized); err != nil { | |
337 return err | |
338 } | |
339 } | |
340 return nil | |
341 }) | |
342 if err != nil { | |
343 revertChanges() | |
344 return err | |
345 } else { | |
346 // TODO(benjaminwagner): pass serialized bytes. | |
347 d.modTasks.TrackModifiedTasks(tasks) | |
348 } | |
349 return nil | |
350 } | |
351 | |
352 // See docs for DB interface. | |
353 func (d *localDB) GetModifiedTasks(id string) ([]*db.Task, error) { | |
354 return d.modTasks.GetModifiedTasks(id) | |
355 } | |
356 | |
357 // See docs for DB interface. | |
358 func (d *localDB) StartTrackingModifiedTasks() (string, error) { | |
359 return d.modTasks.StartTrackingModifiedTasks() | |
360 } | |
361 | |
362 // Returns the total number of tasks in the DB. | |
363 // TODO(benjaminwagner): add a metrics goroutine. | |
364 func (d *localDB) NumTasks() (int, error) { | |
365 var n int | |
366 if err := d.view("NumTasks", func(tx *bolt.Tx) error { | |
367 n = tasksBucket(tx).Stats().KeyN | |
368 return nil | |
369 }); err != nil { | |
370 return -1, err | |
371 } | |
372 return n, nil | |
373 } | |
374 | |
375 // RunBackupServer runs an HTTP server which provides downloadable database | |
376 // backups. | |
377 func (d *localDB) RunBackupServer(port string) error { | |
378 r := mux.NewRouter() | |
379 r.HandleFunc("/backup", func(w http.ResponseWriter, r *http.Request) { | |
380 if err := d.view("Backup", func(tx *bolt.Tx) error { | |
381 w.Header().Set("Content-Type", "application/octet-stream ") | |
382 w.Header().Set("Content-Disposition", "attachment; filen ame=\"tasks.db\"") | |
383 w.Header().Set("Content-Length", strconv.Itoa(int(tx.Siz e()))) | |
384 _, err := tx.WriteTo(w) | |
385 return err | |
386 }); err != nil { | |
387 glog.Errorf("Failed to create DB backup: %s", err) | |
388 httputils.ReportError(w, r, err, "Failed to create DB ba ckup") | |
389 } | |
390 }) | |
391 http.Handle("/", httputils.LoggingGzipRequestResponse(r)) | |
392 return http.ListenAndServe(port, nil) | |
393 } | |
OLD | NEW |