Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: trace/service/impl.go

Issue 1411663004: Create gRPC client and server, traceservice, that stores trace data in a BoltDB backend. (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: fix vet Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « trace/service/README.md ('k') | trace/service/traceservice.proto » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package traceservice
2
3 // Generate the go code from the protocol buffer definitions.
4 //go:generate protoc --go_out=plugins=grpc:. traceservice.proto
5
6 import (
7 "bytes"
8 "encoding/binary"
9 "fmt"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/boltdb/bolt"
15 "github.com/golang/groupcache/lru"
16 "github.com/golang/protobuf/proto"
17 "github.com/skia-dev/glog"
18 "golang.org/x/net/context"
19 )
20
21 const (
22 COMMIT_BUCKET_NAME = "commits"
23 TRACE_BUCKET_NAME = "traces"
24 TRACEID_BUCKET_NAME = "traceids"
25 LARGEST_TRACEID_KEY = "the largest trace64id"
26
27 // How many items to keep in the in-memory LRU cache.
28 MAX_INT64_ID_CACHED = 1024 * 1024
29 )
30
31 // bytesFromUint64 converts a uint64 to a []byte.
32 func bytesFromUint64(u uint64) []byte {
33 ret := make([]byte, 8, 8)
34 binary.LittleEndian.PutUint64(ret, u)
35 return ret
36 }
37
38 // CommitIDToByes serializes the CommitID to a []byte in the same format that Co mmitIDFromBytes reads.
39 //
40 // The []byte is constructed so that serialized CommitIDs are comparable via byt es.Compare
41 // with earlier commits coming before later commits.
42 func CommitIDToBytes(c *CommitID) ([]byte, error) {
43 if strings.Contains(c.Id, "!") || strings.Contains(c.Source, "!") {
44 return nil, fmt.Errorf("Invalid CommitID: Must not contain '!': %#v", *c)
45 }
46 return []byte(fmt.Sprintf("%s!%s!%s", time.Unix(c.Timestamp, 0).Format(t ime.RFC3339), c.Id, c.Source)), nil
47 }
48
49 // CommitIDFromBytes creates a CommitID from a []byte, usually produced from a
50 // previously serialized CommitID. See ToBytes.
51 func CommitIDFromBytes(b []byte) (*CommitID, error) {
52 s := string(b)
53 parts := strings.SplitN(s, "!", 3)
54 if len(parts) != 3 {
55 return nil, fmt.Errorf("Invalid CommitID format %s", s)
56 }
57 t, err := time.Parse(time.RFC3339, parts[0])
58 if err != nil {
59 return nil, fmt.Errorf("Invalid CommitID time format %s: %s", s, err)
60 }
61 return &CommitID{
62 Timestamp: t.Unix(),
63 Id: parts[1],
64 Source: parts[2],
65 }, nil
66 }
67
68 // TraceServiceImpl implements TraceServiceServer.
69 type TraceServiceImpl struct {
70 // db is the BoltDB datastore we actually store the data in.
71 db *bolt.DB
72
73 // cache is an in-memory LRU cache for traceids and trace64ids.
74 cache *lru.Cache
75
76 // mutex controls access to cache.
77 mutex sync.Mutex
78 }
79
80 // NewTraceServiceServer creates a new DB that stores the data in BoltDB format at
81 // the given filename location.
82 func NewTraceServiceServer(filename string) (*TraceServiceImpl, error) {
83 d, err := bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Seco nd})
84 if err != nil {
85 return nil, fmt.Errorf("Failed to open BoltDB at %s: %s", filena me, err)
86 }
87 createBuckets := func(tx *bolt.Tx) error {
88 _, err := tx.CreateBucketIfNotExists([]byte(COMMIT_BUCKET_NAME))
89 if err != nil {
90 return fmt.Errorf("Failed to create bucket %s: %s", COMM IT_BUCKET_NAME, err)
91 }
92 _, err = tx.CreateBucketIfNotExists([]byte(TRACE_BUCKET_NAME))
93 if err != nil {
94 return fmt.Errorf("Failed to create bucket %s: %s", TRAC E_BUCKET_NAME, err)
95 }
96 _, err = tx.CreateBucketIfNotExists([]byte(TRACEID_BUCKET_NAME))
97 if err != nil {
98 return fmt.Errorf("Failed to create bucket %s: %s", TRAC EID_BUCKET_NAME, err)
99 }
100 return nil
101 }
102 if err := d.Update(createBuckets); err != nil {
103 return nil, fmt.Errorf("Failed to create buckets: %s", err)
104 }
105 return &TraceServiceImpl{
106 db: d,
107 cache: lru.New(MAX_INT64_ID_CACHED),
108 }, nil
109 }
110
111 // atomize looks up the 64bit id for the given strings.
112 //
113 // If not found create a new mapping uint64->id and store in the datastore.
114 // Also stores the reverse mapping in the same bucket.
115 func (ts *TraceServiceImpl) atomize(ids []string) (map[string]uint64, error) {
116 ret := map[string]uint64{}
117
118 // First look up everything in the LRU cache.
119 notcached := []string{}
120 for _, id := range ids {
121 ts.mutex.Lock()
122 cached, ok := ts.cache.Get(id)
123 ts.mutex.Unlock()
124 if ok {
125 ret[id] = cached.(uint64)
126 } else {
127 notcached = append(notcached, id)
128 }
129 }
130
131 // If we've found all our answers in the LRU cache then we're done.
132 if len(ret) == len(ids) {
133 return ret, nil
134 }
135
136 // Next look into the datastore and see if we can find the answers there .
137 notstored := []string{}
138 get := func(tx *bolt.Tx) error {
139 t := tx.Bucket([]byte(TRACEID_BUCKET_NAME))
140 for _, id := range notcached {
141 // Find the value in the datastore.
142 if bid64 := t.Get([]byte(id)); bid64 == nil {
143 notstored = append(notstored, id)
144 } else {
145 // If found update the return value and the cach e.
146 id64 := binary.LittleEndian.Uint64(bid64)
147 ts.mutex.Lock()
148 ts.cache.Add(id, id64)
149 ts.cache.Add(id64, id)
150 ts.mutex.Unlock()
151 ret[id] = id64
152 }
153 }
154 return nil
155 }
156
157 if err := ts.db.View(get); err != nil {
158 return nil, fmt.Errorf("Error while reading trace ids: %s", err)
159 }
160
161 if len(ret) == len(ids) {
162 return ret, nil
163 }
164
165 // If we still have ids that we haven't matching trace64ids for then we need
166 // to create trace64ids for them and store them in BoltDB and in the LRU
167 // cache.
168 add := func(tx *bolt.Tx) error {
169 t := tx.Bucket([]byte(TRACEID_BUCKET_NAME))
170 // Find the current largest trace64id.
171 var largest uint64 = 0
172 if blargest := t.Get([]byte(LARGEST_TRACEID_KEY)); blargest != n il {
173 largest = binary.LittleEndian.Uint64(blargest)
174 }
175
176 // Generate a new id for each traceid and store the results.
177 for i, id := range notstored {
178 value := largest + uint64(i) + 1
179 bvalue := make([]byte, 8, 8)
180 binary.LittleEndian.PutUint64(bvalue, value)
181 if err := t.Put([]byte(id), bvalue); err != nil {
182 return fmt.Errorf("Failed to write atomized valu e for %s: %s", id, err)
183 }
184 if err := t.Put(bvalue, []byte(id)); err != nil {
185 return fmt.Errorf("Failed to write atomized reve rse lookup value for %s: %s", id, err)
186 }
187 ts.mutex.Lock()
188 ts.cache.Add(id, value)
189 ts.cache.Add(value, id)
190 ts.mutex.Unlock()
191 ret[id] = value
192 }
193
194 largest = largest + uint64(len(notstored))
195
196 // Write the new value for LARGEST_TRACEID_KEY.
197 blargest := make([]byte, 8, 8)
198 binary.LittleEndian.PutUint64(blargest, largest)
199 if err := t.Put([]byte(LARGEST_TRACEID_KEY), blargest); err != n il {
200 return fmt.Errorf("Failed to write an updated largest tr ace64id value: %s", err)
201 }
202
203 return nil
204 }
205
206 if err := ts.db.Update(add); err != nil {
207 return nil, fmt.Errorf("Error while writing new trace ids: %s", err)
208 }
209
210 if len(ret) == len(ids) {
211 return ret, nil
212 } else {
213 return nil, fmt.Errorf("Failed to add traceid ids: mismatched nu mber of ids.")
214 }
215 }
216
217 // commitinfo is the value stored in the commit bucket.
218 type commitinfo struct {
219 Values map[uint64][]byte
220 }
221
222 // newCommitInfo returns a commitinfo with data deserialized from the byte slice .
223 func newCommitInfo(volatile []byte) (*commitinfo, error) {
224 ret := &commitinfo{
225 Values: map[uint64][]byte{},
226 }
227 if len(volatile) == 0 {
228 return ret, nil
229 }
230
231 b := make([]byte, len(volatile), len(volatile))
232 n := copy(b, volatile)
233 if n != len(volatile) {
234 return nil, fmt.Errorf("Failed to copy all the bytes.")
235 }
236 // The byte slice is structured as a repeating set of:
237 //
238 // [uint64 (8 bytes)][length of the value (1 byte)][value (0-256 bytes, as determined by previous byte)]
239 //
240 for len(b) > 0 {
241 if len(b) < 9 {
242 return nil, fmt.Errorf("Failed to decode, not enough byt es left: %#v", b)
243 }
244 key := binary.LittleEndian.Uint64(b[0:8])
245 length := b[8]
246 b = b[9:]
247 if len(b) < int(length) {
248 return nil, fmt.Errorf("Failed to decode, not enough byt es left for %d: Want %d Got %d", key, length, len(b))
249 }
250 ret.Values[key] = b[0:length]
251 b = b[length:]
252 }
253 return ret, nil
254 }
255
256 // ToBytes serializes the data in the commitinfo into a byte slice. The format i s ingestable by FromBytes.
257 //
258 // The byte slice is structured as a repeating set of three things serialized as bytes.
259 //
260 // 1. uint64
261 // 2. length of the value
262 // 3. the actual value.
263 //
264 // So in the byte slice this would look like:
265 //
266 // [uint64 (8 bytes)][length of the value (1 byte)][value (0-255 bytes, as det ermined by previous byte)]
267 //
268 func (c *commitinfo) ToBytes() []byte {
269 size := 0
270 for _, v := range c.Values {
271 size += 9 + len(v)
272 }
273 buf := make([]byte, 0, size)
274 for k, v := range c.Values {
275 buf = append(buf, bytesFromUint64(k)...)
276 buf = append(buf, byte(len(v)))
277 buf = append(buf, v...)
278 }
279 return buf
280 }
281
282 func (ts *TraceServiceImpl) MissingParams(ctx context.Context, in *MissingParams Request) (*MissingParamsResponse, error) {
283 resp := &MissingParamsResponse{
284 Traceids: []string{},
285 }
286
287 // Populate the response with traceids we can't find in the bucket.
288 get := func(tx *bolt.Tx) error {
289 t := tx.Bucket([]byte(TRACE_BUCKET_NAME))
290 for _, traceid := range in.Traceids {
291 if b := t.Get([]byte(traceid)); b == nil {
292 resp.Traceids = append(resp.Traceids, traceid)
293 }
294 }
295 return nil
296 }
297 if err := ts.db.View(get); err != nil {
298 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er r)
299 }
300 return resp, nil
301 }
302
303 func (ts *TraceServiceImpl) AddParams(ctx context.Context, in *AddParamsRequest) (*EmptyResponse, error) {
304 // Serialize the Params for each trace as a proto and collect the tracei ds.
305 // We do this outside the add func so there's less work taking place in the
306 // Update transaction.
307 params := map[string][]byte{}
308 var err error
309 for key, value := range in.Params {
310 ti := &StoredEntry{
311 Params: value,
312 }
313 params[key], err = proto.Marshal(ti)
314 if err != nil {
315 return nil, fmt.Errorf("Failed to serialize the Params: %s", err)
316 }
317 }
318
319 // Add the Params for each traceid to the bucket.
320 add := func(tx *bolt.Tx) error {
321 t := tx.Bucket([]byte(TRACE_BUCKET_NAME))
322 for traceid, _ := range in.Params {
323 if err := t.Put([]byte(traceid), params[traceid]); err ! = nil {
324 return fmt.Errorf("Failed to write the trace inf o for %s: %s", traceid, err)
325 }
326 }
327 return nil
328 }
329 if err := ts.db.Update(add); err != nil {
330 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er r)
331 }
332 return &EmptyResponse{}, nil
333 }
334
335 func (ts *TraceServiceImpl) Add(ctx context.Context, in *AddRequest) (*EmptyResp onse, error) {
336 glog.Info("Add() begin.")
337 if in == nil {
338 return nil, fmt.Errorf("Received nil request.")
339 }
340 if in.Commitid == nil {
341 return nil, fmt.Errorf("Received nil CommitID")
342 }
343 if in.Entries == nil {
344 return nil, fmt.Errorf("Received nil Entries")
345 }
346
347 // Get the trace64ids for each traceid.
348 keys := []string{}
349 for key, _ := range in.Entries {
350 keys = append(keys, key)
351 }
352
353 trace64ids, err := ts.atomize(keys)
354 glog.Infof("atomized %d keys", len(trace64ids))
355 if err != nil {
356 return nil, fmt.Errorf("Failed to create short trace ids: %s", e rr)
357 }
358
359 add := func(tx *bolt.Tx) error {
360 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME))
361
362 // Write the commitinfo.
363 key, err := CommitIDToBytes(in.Commitid)
364 if err != nil {
365 return err
366 }
367
368 // First load the existing info.
369 data, err := newCommitInfo(c.Get(key))
370 if err != nil {
371 return fmt.Errorf("Unable to decode stored values: %s", err)
372 }
373
374 // Add our new data points.
375 for key, entry := range in.Entries {
376 data.Values[trace64ids[key]] = entry
377 }
378
379 // Write to the datastore.
380 if err := c.Put(key, data.ToBytes()); err != nil {
381 return fmt.Errorf("Failed to write the trace info for %s : %s", key, err)
382 }
383 return nil
384 }
385
386 if err := ts.db.Update(add); err != nil {
387 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er r)
388 }
389 return &EmptyResponse{}, nil
390 }
391
392 func (ts *TraceServiceImpl) Remove(ctx context.Context, in *RemoveRequest) (*Emp tyResponse, error) {
393 if in == nil {
394 return nil, fmt.Errorf("Received nil request.")
395 }
396 remove := func(tx *bolt.Tx) error {
397 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME))
398 key, err := CommitIDToBytes(in.Commitid)
399 if err != nil {
400 return err
401 }
402 return c.Delete(key)
403 }
404 if err := ts.db.Update(remove); err != nil {
405 return nil, fmt.Errorf("Failed to remove values from tracedb: %s ", err)
406 }
407 ret := &EmptyResponse{}
408 return ret, nil
409 }
410
411 func (ts *TraceServiceImpl) List(ctx context.Context, listRequest *ListRequest) (*ListResponse, error) {
412 if listRequest == nil {
413 return nil, fmt.Errorf("Received nil request.")
414 }
415 // Convert the begin and end timestamps into RFC3339 strings we can use for prefix matching.
416 begin := []byte(time.Unix(listRequest.Begin, 0).Format(time.RFC3339))
417 end := []byte(time.Unix(listRequest.End, 0).Format(time.RFC3339))
418
419 commitIDs := []*CommitID{}
420
421 // Do a prefix scan and record all the CommitIDs that match.
422 scan := func(tx *bolt.Tx) error {
423 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME)).Cursor()
424 for k, _ := c.Seek(begin); k != nil && bytes.Compare(k, end) <= 0; k, _ = c.Next() {
425 cid, err := CommitIDFromBytes(k)
426 if err != nil {
427 return fmt.Errorf("scan: Failed to deserialize a commit id: %s", err)
428 }
429 commitIDs = append(commitIDs, cid)
430 }
431 return nil
432 }
433
434 if err := ts.db.View(scan); err != nil {
435 return nil, fmt.Errorf("Failed to scan for commits: %s", err)
436 }
437
438 ret := &ListResponse{
439 Commitids: commitIDs,
440 }
441 return ret, nil
442 }
443
444 func (ts *TraceServiceImpl) GetValues(ctx context.Context, getValuesRequest *Get ValuesRequest) (*GetValuesResponse, error) {
445 if getValuesRequest == nil {
446 return nil, fmt.Errorf("Received nil request.")
447 }
448
449 ret := &GetValuesResponse{
450 Values: map[string][]byte{},
451 }
452
453 // Load the values from the datastore.
454 load := func(tx *bolt.Tx) error {
455 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME))
456 tid := tx.Bucket([]byte(TRACEID_BUCKET_NAME))
457
458 key, err := CommitIDToBytes(getValuesRequest.Commitid)
459 if err != nil {
460 return err
461 }
462 // Load the raw data and convert it into a commitInfo.
463 data, err := newCommitInfo(c.Get(key))
464 if err != nil {
465 return fmt.Errorf("Unable to decode stored values: %s", err)
466 }
467 // Pull data out of commitInfo and put into the GetValuesRespons e.
468 for id64, value := range data.Values {
469 // Look up the traceid from the trace64id, first from th e in-memory
470 // cache, and then from within the BoltDB if not found.
471 ts.mutex.Lock()
472 cached, ok := ts.cache.Get(id64)
473 ts.mutex.Unlock()
474 if ok {
475 ret.Values[cached.(string)] = value
476 } else {
477 if b := tid.Get(bytesFromUint64(id64)); b == nil {
478 return fmt.Errorf("Failed to get traceid for trace64id %d", id64)
479 } else {
480 ret.Values[string(b)] = value
481 }
482 }
483 }
484 return nil
485 }
486 if err := ts.db.View(load); err != nil {
487 return nil, fmt.Errorf("Failed to load data for commitid: %#v, % s", *(getValuesRequest.Commitid), err)
488 }
489
490 return ret, nil
491 }
492
493 func (ts *TraceServiceImpl) GetParams(ctx context.Context, getParamsRequest *Get ParamsRequest) (*GetParamsResponse, error) {
494 if getParamsRequest == nil {
495 return nil, fmt.Errorf("Received nil request.")
496 }
497
498 ret := &GetParamsResponse{
499 Params: map[string]*Params{},
500 }
501 load := func(tx *bolt.Tx) error {
502 t := tx.Bucket([]byte(TRACE_BUCKET_NAME))
503 for _, traceid := range getParamsRequest.Traceids {
504 entry := &StoredEntry{}
505 if err := proto.Unmarshal(t.Get([]byte(traceid)), entry) ; err != nil {
506 return fmt.Errorf("Failed to unmarshal StoredEnt ry proto for %s: %s", traceid, err)
507 }
508 ret.Params[traceid] = entry.Params
509 }
510
511 return nil
512 }
513 if err := ts.db.View(load); err != nil {
514 return nil, fmt.Errorf("GetParams: Failed to load data: %s", err )
515 }
516
517 return ret, nil
518 }
519
520 // Close closes the underlying datastore and it not part of the TraceServiceServ er interface.
521 func (ts *TraceServiceImpl) Close() error {
522 return ts.db.Close()
523 }
OLDNEW
« no previous file with comments | « trace/service/README.md ('k') | trace/service/traceservice.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698