Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: traceservice/proto/impl.go

Issue 1411663004: Create gRPC client and server, traceservice, that stores trace data in a BoltDB backend. (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: clean Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package traceservice
2
3 // Generate the go code from the protocol buffer definitions.
4 //go:generate protoc --go_out=plugins=grpc:. traceservice.proto
5
6 import (
7 "bytes"
8 "encoding/binary"
9 "fmt"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/boltdb/bolt"
15 "github.com/golang/groupcache/lru"
16 "github.com/golang/protobuf/proto"
17 "github.com/skia-dev/glog"
18 "golang.org/x/net/context"
19 )
20
21 const (
22 COMMIT_BUCKET_NAME = "commits"
23 TRACE_BUCKET_NAME = "traces"
24 TRACEID_BUCKET_NAME = "traceids"
25 LARGEST_TRACEID_KEY = "the largest trace64id"
26
27 // How many items to keep in the in-memory LRU cache.
28 MAX_INT64_ID_CACHED = 1024 * 1024
29 )
30
31 // BytesFromInt64 converts a uint64 to a []byte.
32 func BytesFromInt64(u uint64) []byte {
stephana 2015/10/19 20:00:19 nit: Should this be named something like BytesFrom
jcgregorio 2015/10/19 20:30:11 Changed to Uint and made private. On 2015/10/19 a
33 ret := make([]byte, 8, 8)
34 binary.LittleEndian.PutUint64(ret, u)
35 return ret
36 }
37
38 // BytesFromInt16 converts a uint16 into a []byte.
39 func BytesFromInt16(i uint16) []byte {
40 ret := make([]byte, 2, 2)
41 binary.LittleEndian.PutUint16(ret, i)
42 return ret
43 }
44
45 // CommitIDToByes serializes the CommitID to a []byte in the same format that Co mmitIDFromBytes reads.
46 //
47 // The []byte is constructed so that serialized CommitIDs are comparable via byt es.Compare
48 // with earlier commits coming before later commits.
49 func CommitIDToBytes(c *CommitID) []byte {
stephana 2015/10/19 20:00:19 There is the (probably unlikely) possibility that
jcgregorio 2015/10/19 20:30:11 Good catch, added a check for ! and return an erro
50 return []byte(fmt.Sprintf("%s!%s!%s", time.Unix(c.Timestamp, 0).Format(t ime.RFC3339), c.Id, c.Source))
51 }
52
53 // CommitIDFromBytes creates a CommitID from a []byte, usually produced from a
54 // previously serialized CommitID. See ToBytes.
55 func CommitIDFromBytes(b []byte) (*CommitID, error) {
56 s := string(b)
57 parts := strings.SplitN(s, "!", 3)
58 if len(parts) != 3 {
59 return nil, fmt.Errorf("Invalid CommitID format %s", s)
60 }
61 t, err := time.Parse(time.RFC3339, parts[0])
62 if err != nil {
63 return nil, fmt.Errorf("Invalid CommitID time format %s: %s", s, err)
64 }
65 return &CommitID{
66 Timestamp: t.Unix(),
67 Id: parts[1],
68 Source: parts[2],
69 }, nil
70 }
71
72 // TraceServiceImpl implements TraceServiceServer.
73 type TraceServiceImpl struct {
74 // db is the BoltDB datastore we actually store the data in.
75 db *bolt.DB
76
77 // cache is an in-memory LRU cache for traceids and trace64ids.
78 cache *lru.Cache
79
80 // mutex controls access to cache.
81 mutex sync.Mutex
82 }
83
84 // NewTraceServiceServer creates a new DB that stores the data in BoltDB format at
85 // the given filename location.
86 func NewTraceServiceServer(filename string) (*TraceServiceImpl, error) {
87 d, err := bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Seco nd})
88 if err != nil {
89 return nil, fmt.Errorf("Failed to open BoltDB at %s: %s", filena me, err)
90 }
91 createBuckets := func(tx *bolt.Tx) error {
92 _, err := tx.CreateBucketIfNotExists([]byte(COMMIT_BUCKET_NAME))
93 if err != nil {
94 return fmt.Errorf("Failed to create bucket %s: %s", COMM IT_BUCKET_NAME, err)
95 }
96 _, err = tx.CreateBucketIfNotExists([]byte(TRACE_BUCKET_NAME))
97 if err != nil {
98 return fmt.Errorf("Failed to create bucket %s: %s", TRAC E_BUCKET_NAME, err)
99 }
100 _, err = tx.CreateBucketIfNotExists([]byte(TRACEID_BUCKET_NAME))
101 if err != nil {
102 return fmt.Errorf("Failed to create bucket %s: %s", TRAC EID_BUCKET_NAME, err)
103 }
104 return nil
105 }
106 if err := d.Update(createBuckets); err != nil {
107 return nil, fmt.Errorf("Failed to create buckets: %s", err)
108 }
109 return &TraceServiceImpl{
110 db: d,
111 cache: lru.New(MAX_INT64_ID_CACHED),
112 }, nil
113 }
114
115 // atomize looks up the 64bit id for the given strings.
116 //
117 // If not found create a new mapping uint64->id and store in the datastore.
118 // Also stores the reverse mapping in the same bucket.
119 func (ts *TraceServiceImpl) atomize(ids []string) (map[string]uint64, error) {
120 ret := map[string]uint64{}
121
122 // First look up everything in the LRU cache.
123 notcached := []string{}
124 for _, id := range ids {
125 ts.mutex.Lock()
126 cached, ok := ts.cache.Get(id)
127 ts.mutex.Unlock()
128 if ok {
129 ret[id] = cached.(uint64)
130 } else {
131 notcached = append(notcached, id)
132 }
133 }
134
135 // If we've found all our answers in the LRU cache then we're done.
136 if len(ret) == len(ids) {
137 return ret, nil
138 }
139
140 // Next look into the datastore and see if we can find the answers there .
141 notstored := []string{}
142 get := func(tx *bolt.Tx) error {
143 t := tx.Bucket([]byte(TRACEID_BUCKET_NAME))
144 for _, id := range notcached {
145 // Find the value in the datastore.
146 if bid64 := t.Get([]byte(id)); bid64 == nil {
147 notstored = append(notstored, id)
148 } else {
149 // If found update the return value and the cach e.
150 id64 := binary.LittleEndian.Uint64(bid64)
151 ts.mutex.Lock()
152 ts.cache.Add(id, id64)
153 ts.cache.Add(id64, id)
154 ts.mutex.Unlock()
155 ret[id] = id64
156 }
157 }
158 return nil
159 }
160
161 if err := ts.db.View(get); err != nil {
162 return nil, fmt.Errorf("Error while reading trace ids: %s", err)
163 }
164
165 if len(ret) == len(ids) {
166 return ret, nil
167 }
168
169 // If we still have ids that we haven't matching trace64ids for then we need
170 // to create trace64ids for them and store them in BoltDB and in the LRU
171 // cache.
172 add := func(tx *bolt.Tx) error {
173 t := tx.Bucket([]byte(TRACEID_BUCKET_NAME))
174 // Find the current largest trace64id.
175 var largest uint64 = 0
176 if blargest := t.Get([]byte(LARGEST_TRACEID_KEY)); blargest != n il {
177 largest = binary.LittleEndian.Uint64(blargest)
178 }
179
180 // Generate a new id for each traceid and store the results.
181 for i, id := range notstored {
182 value := largest + uint64(i) + 1
183 bvalue := make([]byte, 8, 8)
184 binary.LittleEndian.PutUint64(bvalue, value)
185 if err := t.Put([]byte(id), bvalue); err != nil {
186 return fmt.Errorf("Failed to write atomized valu e for %s: %s", id, err)
187 }
188 if err := t.Put(bvalue, []byte(id)); err != nil {
189 return fmt.Errorf("Failed to write atomized reve rse lookup value for %s: %s", id, err)
190 }
191 ts.mutex.Lock()
192 ts.cache.Add(id, value)
193 ts.cache.Add(value, id)
194 ts.mutex.Unlock()
195 ret[id] = value
196 }
197
198 largest = largest + uint64(len(notstored))
199
200 // Write the new value for LARGEST_TRACEID_KEY.
201 blargest := make([]byte, 8, 8)
202 binary.LittleEndian.PutUint64(blargest, largest)
203 if err := t.Put([]byte(LARGEST_TRACEID_KEY), blargest); err != n il {
204 return fmt.Errorf("Failed to write an updated largest tr ace64id value: %s", err)
205 }
206
207 return nil
208 }
209
210 if err := ts.db.Update(add); err != nil {
211 return nil, fmt.Errorf("Error while writing new trace ids: %s", err)
212 }
213
214 if len(ret) == len(ids) {
215 return ret, nil
216 } else {
217 return nil, fmt.Errorf("Failed to add traceid ids: mismatched nu mber of ids.")
218 }
219 }
220
221 // commitinfo is the value stored in the commit bucket.
222 type commitinfo struct {
223 Values map[uint64][]byte
224 }
225
226 // newCommitInfo returns a commitinfo with data deserialized from the byte slice .
227 func newCommitInfo(volatile []byte) (*commitinfo, error) {
228 ret := &commitinfo{
229 Values: map[uint64][]byte{},
230 }
231 if len(volatile) == 0 {
232 return ret, nil
233 }
234
235 b := make([]byte, len(volatile), len(volatile))
236 n := copy(b, volatile)
237 if n != len(volatile) {
238 return nil, fmt.Errorf("Failed to copy all the bytes.")
239 }
240 // The byte slice is structured as a repeating set of:
241 //
242 // [uint64 (8 bytes)][length of the value (1 byte)][value (0-256 bytes, as determined by previous byte)]
243 //
244 for len(b) > 0 {
245 if len(b) < 9 {
246 return nil, fmt.Errorf("Failed to decode, not enough byt es left: %#v", b)
247 }
248 key := binary.LittleEndian.Uint64(b[0:8])
249 length := b[8]
250 b = b[9:]
251 if len(b) < int(length) {
252 return nil, fmt.Errorf("Failed to decode, not enough byt es left for %d: Want %d Got %d", key, length, len(b))
253 }
254 ret.Values[key] = b[0:length]
255 b = b[length:]
256 }
257 return ret, nil
258 }
259
260 // ToBytes serializes the data in the commitinfo into a byte slice. The format i s ingestable by FromBytes.
261 //
262 // The byte slice is structured as a repeating set of three things serialized as bytes.
263 //
264 // 1. uint64
265 // 2. length of the value
266 // 3. the actual value.
267 //
268 // So in the byte slice this would look like:
269 //
270 // [uint64 (8 bytes)][length of the value (1 byte)][value (0-255 bytes, as det ermined by previous byte)]
271 //
272 func (c *commitinfo) ToBytes() []byte {
273 size := 0
274 for _, v := range c.Values {
275 size += 9 + len(v)
276 }
277 buf := make([]byte, 0, size)
278 for k, v := range c.Values {
279 buf = append(buf, BytesFromInt64(k)...)
280 buf = append(buf, byte(len(v)))
281 buf = append(buf, v...)
282 }
283 return buf
284 }
285
286 func (ts *TraceServiceImpl) MissingParams(ctx context.Context, in *MissingParams Request) (*MissingParamsResponse, error) {
287 resp := &MissingParamsResponse{
288 Traceids: []string{},
289 }
290
291 // Populate the response with traceids we can't find in the bucket.
292 get := func(tx *bolt.Tx) error {
293 t := tx.Bucket([]byte(TRACE_BUCKET_NAME))
294 for _, traceid := range in.Traceids {
295 if b := t.Get([]byte(traceid)); b == nil {
296 resp.Traceids = append(resp.Traceids, traceid)
297 }
298 }
299 return nil
300 }
301 if err := ts.db.View(get); err != nil {
302 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er r)
303 }
304 return resp, nil
305 }
306
307 func (ts *TraceServiceImpl) AddParams(ctx context.Context, in *AddParamsRequest) (*EmptyResponse, error) {
308 // Serialize the Params for each trace as a proto and collect the tracei ds.
309 // We do this outside the add func so there's less work taking place in the
310 // Update transaction.
311 params := map[string][]byte{}
312 var err error
313 for key, value := range in.Params {
314 ti := &StoredEntry{
315 Params: value,
316 }
317 params[key], err = proto.Marshal(ti)
318 if err != nil {
319 return nil, fmt.Errorf("Failed to serialize the Params: %s", err)
320 }
321 }
322
323 // Add the Params for each traceid to the bucket.
324 add := func(tx *bolt.Tx) error {
325 t := tx.Bucket([]byte(TRACE_BUCKET_NAME))
326 for traceid, _ := range in.Params {
327 if err := t.Put([]byte(traceid), params[traceid]); err ! = nil {
328 return fmt.Errorf("Failed to write the trace inf o for %s: %s", traceid, err)
329 }
330 }
331 return nil
332 }
333 if err := ts.db.Update(add); err != nil {
334 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er r)
335 }
336 return &EmptyResponse{}, nil
337 }
338
339 func (ts *TraceServiceImpl) Add(ctx context.Context, in *AddRequest) (*EmptyResp onse, error) {
340 glog.Info("Add() begin.")
341 if in == nil {
342 return nil, fmt.Errorf("Received nil request.")
343 }
344 if in.Commitid == nil {
345 return nil, fmt.Errorf("Received nil CommitID")
346 }
347 if in.Entries == nil {
348 return nil, fmt.Errorf("Received nil Entries")
349 }
350
351 // Get the trace64ids for each traceid.
352 keys := []string{}
353 for key, _ := range in.Entries {
354 keys = append(keys, key)
355 }
356
357 trace64ids, err := ts.atomize(keys)
358 glog.Infof("atomized %d keys", len(trace64ids))
359 if err != nil {
360 return nil, fmt.Errorf("Failed to create short trace ids: %d", e rr)
361 }
362
363 add := func(tx *bolt.Tx) error {
364 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME))
365
366 // Write the commitinfo.
367 key := CommitIDToBytes(in.Commitid)
368
369 // First load the existing info.
370 data, err := newCommitInfo(c.Get(key))
371 if err != nil {
372 return fmt.Errorf("Unable to decode stored values: %s", err)
373 }
374
375 // Add our new data points.
376 for key, entry := range in.Entries {
377 data.Values[trace64ids[key]] = entry
378 }
379
380 // Write to the datastore.
381 if err := c.Put(key, data.ToBytes()); err != nil {
382 return fmt.Errorf("Failed to write the trace info for %s : %s", key, err)
383 }
384 return nil
385 }
386
387 if err := ts.db.Update(add); err != nil {
388 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er r)
389 }
390 return &EmptyResponse{}, nil
391 }
392
393 func (ts *TraceServiceImpl) Remove(ctx context.Context, in *RemoveRequest) (*Emp tyResponse, error) {
394 if in == nil {
395 return nil, fmt.Errorf("Received nil request.")
396 }
397 remove := func(tx *bolt.Tx) error {
398 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME))
399 key := CommitIDToBytes(in.Commitid)
400 return c.Delete(key)
401 }
402 if err := ts.db.Update(remove); err != nil {
403 return nil, fmt.Errorf("Failed to remove values from tracedb: %s ", err)
404 }
405 ret := &EmptyResponse{}
406 return ret, nil
407 }
408
409 func (ts *TraceServiceImpl) List(ctx context.Context, listRequest *ListRequest) (*ListResponse, error) {
410 if listRequest == nil {
411 return nil, fmt.Errorf("Received nil request.")
412 }
413 // Convert the begin and end timestamps into RFC3339 strings we can use for prefix matching.
414 begin := []byte(time.Unix(listRequest.Begin, 0).Format(time.RFC3339))
415 end := []byte(time.Unix(listRequest.End, 0).Format(time.RFC3339))
416
417 commitIDs := []*CommitID{}
418
419 // Do a prefix scan and record all the CommitIDs that match.
420 scan := func(tx *bolt.Tx) error {
421 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME)).Cursor()
422 for k, _ := c.Seek(begin); k != nil && bytes.Compare(k, end) <= 0; k, _ = c.Next() {
423 cid, err := CommitIDFromBytes(k)
424 if err != nil {
425 return fmt.Errorf("scan: Failed to deserialize a commit id: %s", err)
426 }
427 commitIDs = append(commitIDs, cid)
428 }
429 return nil
430 }
431
432 if err := ts.db.View(scan); err != nil {
433 return nil, fmt.Errorf("Failed to scan for commits: %s", err)
434 }
435
436 ret := &ListResponse{
437 Commitids: commitIDs,
438 }
439 return ret, nil
440 }
441
442 func (ts *TraceServiceImpl) GetValues(ctx context.Context, getValuesRequest *Get ValuesRequest) (*GetValuesResponse, error) {
443 if getValuesRequest == nil {
444 return nil, fmt.Errorf("Received nil request.")
445 }
446
447 ret := &GetValuesResponse{
448 Values: map[string][]byte{},
449 }
450
451 // Load the values from the datastore.
452 load := func(tx *bolt.Tx) error {
453 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME))
454 tid := tx.Bucket([]byte(TRACEID_BUCKET_NAME))
455
456 key := CommitIDToBytes(getValuesRequest.Commitid)
457 // Load the raw data and convert it into a commitInfo.
458 data, err := newCommitInfo(c.Get(key))
459 if err != nil {
460 return fmt.Errorf("Unable to decode stored values: %s", err)
461 }
462 // Pull data out of commitInfo and put into the GetValuesRespons e.
463 for id64, value := range data.Values {
464 // Look up the traceid from the trace64id, first from th e in-memory
465 // cache, and then from within the BoltDB if not found.
466 ts.mutex.Lock()
467 cached, ok := ts.cache.Get(id64)
468 ts.mutex.Unlock()
469 if ok {
470 ret.Values[cached.(string)] = value
471 } else {
472 if b := tid.Get(BytesFromInt64(id64)); b == nil {
473 return fmt.Errorf("Failed to get traceid for trace64id %d", id64)
474 } else {
475 ret.Values[string(b)] = value
476 }
477 }
478 }
479 return nil
480 }
481 if err := ts.db.View(load); err != nil {
482 return nil, fmt.Errorf("Failed to load data for commitid: %#v, % s", *(getValuesRequest.Commitid), err)
483 }
484
485 return ret, nil
486 }
487
488 func (ts *TraceServiceImpl) GetParams(ctx context.Context, getParamsRequest *Get ParamsRequest) (*GetParamsResponse, error) {
489 if getParamsRequest == nil {
490 return nil, fmt.Errorf("Received nil request.")
491 }
492
493 ret := &GetParamsResponse{
494 Params: map[string]*Params{},
495 }
496 load := func(tx *bolt.Tx) error {
497 t := tx.Bucket([]byte(TRACE_BUCKET_NAME))
498 for _, traceid := range getParamsRequest.Traceids {
499 entry := &StoredEntry{}
500 if err := proto.Unmarshal(t.Get([]byte(traceid)), entry) ; err != nil {
501 return fmt.Errorf("Failed to unmarshal StoredEnt ry proto for %s: %s", traceid, err)
502 }
503 ret.Params[traceid] = entry.Params
504 }
505
506 return nil
507 }
508 if err := ts.db.View(load); err != nil {
509 return nil, fmt.Errorf("GetParams: Failed to load data: %s", err )
510 }
511
512 return ret, nil
513 }
514
515 // Close closes the underlying datastore and it not part of the TraceServiceServ er interface.
516 func (ts *TraceServiceImpl) Close() error {
517 return ts.db.Close()
518 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698