| OLD | NEW |
| (Empty) | |
| 1 package traceservice |
| 2 |
| 3 // Generate the go code from the protocol buffer definitions. |
| 4 //go:generate protoc --go_out=plugins=grpc:. traceservice.proto |
| 5 |
| 6 import ( |
| 7 "bytes" |
| 8 "encoding/binary" |
| 9 "fmt" |
| 10 "strings" |
| 11 "sync" |
| 12 "time" |
| 13 |
| 14 "github.com/boltdb/bolt" |
| 15 "github.com/golang/groupcache/lru" |
| 16 "github.com/golang/protobuf/proto" |
| 17 "github.com/skia-dev/glog" |
| 18 "golang.org/x/net/context" |
| 19 ) |
| 20 |
| 21 const ( |
| 22 COMMIT_BUCKET_NAME = "commits" |
| 23 TRACE_BUCKET_NAME = "traces" |
| 24 TRACEID_BUCKET_NAME = "traceids" |
| 25 LARGEST_TRACEID_KEY = "the largest trace64id" |
| 26 |
| 27 // How many items to keep in the in-memory LRU cache. |
| 28 MAX_INT64_ID_CACHED = 1024 * 1024 |
| 29 ) |
| 30 |
| 31 // bytesFromUint64 converts a uint64 to a []byte. |
| 32 func bytesFromUint64(u uint64) []byte { |
| 33 ret := make([]byte, 8, 8) |
| 34 binary.LittleEndian.PutUint64(ret, u) |
| 35 return ret |
| 36 } |
| 37 |
| 38 // CommitIDToByes serializes the CommitID to a []byte in the same format that Co
mmitIDFromBytes reads. |
| 39 // |
| 40 // The []byte is constructed so that serialized CommitIDs are comparable via byt
es.Compare |
| 41 // with earlier commits coming before later commits. |
| 42 func CommitIDToBytes(c *CommitID) ([]byte, error) { |
| 43 if strings.Contains(c.Id, "!") || strings.Contains(c.Source, "!") { |
| 44 return nil, fmt.Errorf("Invalid CommitID: Must not contain '!':
%#v", *c) |
| 45 } |
| 46 return []byte(fmt.Sprintf("%s!%s!%s", time.Unix(c.Timestamp, 0).Format(t
ime.RFC3339), c.Id, c.Source)), nil |
| 47 } |
| 48 |
| 49 // CommitIDFromBytes creates a CommitID from a []byte, usually produced from a |
| 50 // previously serialized CommitID. See ToBytes. |
| 51 func CommitIDFromBytes(b []byte) (*CommitID, error) { |
| 52 s := string(b) |
| 53 parts := strings.SplitN(s, "!", 3) |
| 54 if len(parts) != 3 { |
| 55 return nil, fmt.Errorf("Invalid CommitID format %s", s) |
| 56 } |
| 57 t, err := time.Parse(time.RFC3339, parts[0]) |
| 58 if err != nil { |
| 59 return nil, fmt.Errorf("Invalid CommitID time format %s: %s", s,
err) |
| 60 } |
| 61 return &CommitID{ |
| 62 Timestamp: t.Unix(), |
| 63 Id: parts[1], |
| 64 Source: parts[2], |
| 65 }, nil |
| 66 } |
| 67 |
| 68 // TraceServiceImpl implements TraceServiceServer. |
| 69 type TraceServiceImpl struct { |
| 70 // db is the BoltDB datastore we actually store the data in. |
| 71 db *bolt.DB |
| 72 |
| 73 // cache is an in-memory LRU cache for traceids and trace64ids. |
| 74 cache *lru.Cache |
| 75 |
| 76 // mutex controls access to cache. |
| 77 mutex sync.Mutex |
| 78 } |
| 79 |
| 80 // NewTraceServiceServer creates a new DB that stores the data in BoltDB format
at |
| 81 // the given filename location. |
| 82 func NewTraceServiceServer(filename string) (*TraceServiceImpl, error) { |
| 83 d, err := bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Seco
nd}) |
| 84 if err != nil { |
| 85 return nil, fmt.Errorf("Failed to open BoltDB at %s: %s", filena
me, err) |
| 86 } |
| 87 createBuckets := func(tx *bolt.Tx) error { |
| 88 _, err := tx.CreateBucketIfNotExists([]byte(COMMIT_BUCKET_NAME)) |
| 89 if err != nil { |
| 90 return fmt.Errorf("Failed to create bucket %s: %s", COMM
IT_BUCKET_NAME, err) |
| 91 } |
| 92 _, err = tx.CreateBucketIfNotExists([]byte(TRACE_BUCKET_NAME)) |
| 93 if err != nil { |
| 94 return fmt.Errorf("Failed to create bucket %s: %s", TRAC
E_BUCKET_NAME, err) |
| 95 } |
| 96 _, err = tx.CreateBucketIfNotExists([]byte(TRACEID_BUCKET_NAME)) |
| 97 if err != nil { |
| 98 return fmt.Errorf("Failed to create bucket %s: %s", TRAC
EID_BUCKET_NAME, err) |
| 99 } |
| 100 return nil |
| 101 } |
| 102 if err := d.Update(createBuckets); err != nil { |
| 103 return nil, fmt.Errorf("Failed to create buckets: %s", err) |
| 104 } |
| 105 return &TraceServiceImpl{ |
| 106 db: d, |
| 107 cache: lru.New(MAX_INT64_ID_CACHED), |
| 108 }, nil |
| 109 } |
| 110 |
| 111 // atomize looks up the 64bit id for the given strings. |
| 112 // |
| 113 // If not found create a new mapping uint64->id and store in the datastore. |
| 114 // Also stores the reverse mapping in the same bucket. |
| 115 func (ts *TraceServiceImpl) atomize(ids []string) (map[string]uint64, error) { |
| 116 ret := map[string]uint64{} |
| 117 |
| 118 // First look up everything in the LRU cache. |
| 119 notcached := []string{} |
| 120 for _, id := range ids { |
| 121 ts.mutex.Lock() |
| 122 cached, ok := ts.cache.Get(id) |
| 123 ts.mutex.Unlock() |
| 124 if ok { |
| 125 ret[id] = cached.(uint64) |
| 126 } else { |
| 127 notcached = append(notcached, id) |
| 128 } |
| 129 } |
| 130 |
| 131 // If we've found all our answers in the LRU cache then we're done. |
| 132 if len(ret) == len(ids) { |
| 133 return ret, nil |
| 134 } |
| 135 |
| 136 // Next look into the datastore and see if we can find the answers there
. |
| 137 notstored := []string{} |
| 138 get := func(tx *bolt.Tx) error { |
| 139 t := tx.Bucket([]byte(TRACEID_BUCKET_NAME)) |
| 140 for _, id := range notcached { |
| 141 // Find the value in the datastore. |
| 142 if bid64 := t.Get([]byte(id)); bid64 == nil { |
| 143 notstored = append(notstored, id) |
| 144 } else { |
| 145 // If found update the return value and the cach
e. |
| 146 id64 := binary.LittleEndian.Uint64(bid64) |
| 147 ts.mutex.Lock() |
| 148 ts.cache.Add(id, id64) |
| 149 ts.cache.Add(id64, id) |
| 150 ts.mutex.Unlock() |
| 151 ret[id] = id64 |
| 152 } |
| 153 } |
| 154 return nil |
| 155 } |
| 156 |
| 157 if err := ts.db.View(get); err != nil { |
| 158 return nil, fmt.Errorf("Error while reading trace ids: %s", err) |
| 159 } |
| 160 |
| 161 if len(ret) == len(ids) { |
| 162 return ret, nil |
| 163 } |
| 164 |
| 165 // If we still have ids that we haven't matching trace64ids for then we
need |
| 166 // to create trace64ids for them and store them in BoltDB and in the LRU |
| 167 // cache. |
| 168 add := func(tx *bolt.Tx) error { |
| 169 t := tx.Bucket([]byte(TRACEID_BUCKET_NAME)) |
| 170 // Find the current largest trace64id. |
| 171 var largest uint64 = 0 |
| 172 if blargest := t.Get([]byte(LARGEST_TRACEID_KEY)); blargest != n
il { |
| 173 largest = binary.LittleEndian.Uint64(blargest) |
| 174 } |
| 175 |
| 176 // Generate a new id for each traceid and store the results. |
| 177 for i, id := range notstored { |
| 178 value := largest + uint64(i) + 1 |
| 179 bvalue := make([]byte, 8, 8) |
| 180 binary.LittleEndian.PutUint64(bvalue, value) |
| 181 if err := t.Put([]byte(id), bvalue); err != nil { |
| 182 return fmt.Errorf("Failed to write atomized valu
e for %s: %s", id, err) |
| 183 } |
| 184 if err := t.Put(bvalue, []byte(id)); err != nil { |
| 185 return fmt.Errorf("Failed to write atomized reve
rse lookup value for %s: %s", id, err) |
| 186 } |
| 187 ts.mutex.Lock() |
| 188 ts.cache.Add(id, value) |
| 189 ts.cache.Add(value, id) |
| 190 ts.mutex.Unlock() |
| 191 ret[id] = value |
| 192 } |
| 193 |
| 194 largest = largest + uint64(len(notstored)) |
| 195 |
| 196 // Write the new value for LARGEST_TRACEID_KEY. |
| 197 blargest := make([]byte, 8, 8) |
| 198 binary.LittleEndian.PutUint64(blargest, largest) |
| 199 if err := t.Put([]byte(LARGEST_TRACEID_KEY), blargest); err != n
il { |
| 200 return fmt.Errorf("Failed to write an updated largest tr
ace64id value: %s", err) |
| 201 } |
| 202 |
| 203 return nil |
| 204 } |
| 205 |
| 206 if err := ts.db.Update(add); err != nil { |
| 207 return nil, fmt.Errorf("Error while writing new trace ids: %s",
err) |
| 208 } |
| 209 |
| 210 if len(ret) == len(ids) { |
| 211 return ret, nil |
| 212 } else { |
| 213 return nil, fmt.Errorf("Failed to add traceid ids: mismatched nu
mber of ids.") |
| 214 } |
| 215 } |
| 216 |
| 217 // commitinfo is the value stored in the commit bucket. |
| 218 type commitinfo struct { |
| 219 Values map[uint64][]byte |
| 220 } |
| 221 |
| 222 // newCommitInfo returns a commitinfo with data deserialized from the byte slice
. |
| 223 func newCommitInfo(volatile []byte) (*commitinfo, error) { |
| 224 ret := &commitinfo{ |
| 225 Values: map[uint64][]byte{}, |
| 226 } |
| 227 if len(volatile) == 0 { |
| 228 return ret, nil |
| 229 } |
| 230 |
| 231 b := make([]byte, len(volatile), len(volatile)) |
| 232 n := copy(b, volatile) |
| 233 if n != len(volatile) { |
| 234 return nil, fmt.Errorf("Failed to copy all the bytes.") |
| 235 } |
| 236 // The byte slice is structured as a repeating set of: |
| 237 // |
| 238 // [uint64 (8 bytes)][length of the value (1 byte)][value (0-256 bytes,
as determined by previous byte)] |
| 239 // |
| 240 for len(b) > 0 { |
| 241 if len(b) < 9 { |
| 242 return nil, fmt.Errorf("Failed to decode, not enough byt
es left: %#v", b) |
| 243 } |
| 244 key := binary.LittleEndian.Uint64(b[0:8]) |
| 245 length := b[8] |
| 246 b = b[9:] |
| 247 if len(b) < int(length) { |
| 248 return nil, fmt.Errorf("Failed to decode, not enough byt
es left for %d: Want %d Got %d", key, length, len(b)) |
| 249 } |
| 250 ret.Values[key] = b[0:length] |
| 251 b = b[length:] |
| 252 } |
| 253 return ret, nil |
| 254 } |
| 255 |
| 256 // ToBytes serializes the data in the commitinfo into a byte slice. The format i
s ingestable by FromBytes. |
| 257 // |
| 258 // The byte slice is structured as a repeating set of three things serialized as
bytes. |
| 259 // |
| 260 // 1. uint64 |
| 261 // 2. length of the value |
| 262 // 3. the actual value. |
| 263 // |
| 264 // So in the byte slice this would look like: |
| 265 // |
| 266 // [uint64 (8 bytes)][length of the value (1 byte)][value (0-255 bytes, as det
ermined by previous byte)] |
| 267 // |
| 268 func (c *commitinfo) ToBytes() []byte { |
| 269 size := 0 |
| 270 for _, v := range c.Values { |
| 271 size += 9 + len(v) |
| 272 } |
| 273 buf := make([]byte, 0, size) |
| 274 for k, v := range c.Values { |
| 275 buf = append(buf, bytesFromUint64(k)...) |
| 276 buf = append(buf, byte(len(v))) |
| 277 buf = append(buf, v...) |
| 278 } |
| 279 return buf |
| 280 } |
| 281 |
| 282 func (ts *TraceServiceImpl) MissingParams(ctx context.Context, in *MissingParams
Request) (*MissingParamsResponse, error) { |
| 283 resp := &MissingParamsResponse{ |
| 284 Traceids: []string{}, |
| 285 } |
| 286 |
| 287 // Populate the response with traceids we can't find in the bucket. |
| 288 get := func(tx *bolt.Tx) error { |
| 289 t := tx.Bucket([]byte(TRACE_BUCKET_NAME)) |
| 290 for _, traceid := range in.Traceids { |
| 291 if b := t.Get([]byte(traceid)); b == nil { |
| 292 resp.Traceids = append(resp.Traceids, traceid) |
| 293 } |
| 294 } |
| 295 return nil |
| 296 } |
| 297 if err := ts.db.View(get); err != nil { |
| 298 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er
r) |
| 299 } |
| 300 return resp, nil |
| 301 } |
| 302 |
| 303 func (ts *TraceServiceImpl) AddParams(ctx context.Context, in *AddParamsRequest)
(*EmptyResponse, error) { |
| 304 // Serialize the Params for each trace as a proto and collect the tracei
ds. |
| 305 // We do this outside the add func so there's less work taking place in
the |
| 306 // Update transaction. |
| 307 params := map[string][]byte{} |
| 308 var err error |
| 309 for key, value := range in.Params { |
| 310 ti := &StoredEntry{ |
| 311 Params: value, |
| 312 } |
| 313 params[key], err = proto.Marshal(ti) |
| 314 if err != nil { |
| 315 return nil, fmt.Errorf("Failed to serialize the Params:
%s", err) |
| 316 } |
| 317 } |
| 318 |
| 319 // Add the Params for each traceid to the bucket. |
| 320 add := func(tx *bolt.Tx) error { |
| 321 t := tx.Bucket([]byte(TRACE_BUCKET_NAME)) |
| 322 for traceid, _ := range in.Params { |
| 323 if err := t.Put([]byte(traceid), params[traceid]); err !
= nil { |
| 324 return fmt.Errorf("Failed to write the trace inf
o for %s: %s", traceid, err) |
| 325 } |
| 326 } |
| 327 return nil |
| 328 } |
| 329 if err := ts.db.Update(add); err != nil { |
| 330 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er
r) |
| 331 } |
| 332 return &EmptyResponse{}, nil |
| 333 } |
| 334 |
| 335 func (ts *TraceServiceImpl) Add(ctx context.Context, in *AddRequest) (*EmptyResp
onse, error) { |
| 336 glog.Info("Add() begin.") |
| 337 if in == nil { |
| 338 return nil, fmt.Errorf("Received nil request.") |
| 339 } |
| 340 if in.Commitid == nil { |
| 341 return nil, fmt.Errorf("Received nil CommitID") |
| 342 } |
| 343 if in.Entries == nil { |
| 344 return nil, fmt.Errorf("Received nil Entries") |
| 345 } |
| 346 |
| 347 // Get the trace64ids for each traceid. |
| 348 keys := []string{} |
| 349 for key, _ := range in.Entries { |
| 350 keys = append(keys, key) |
| 351 } |
| 352 |
| 353 trace64ids, err := ts.atomize(keys) |
| 354 glog.Infof("atomized %d keys", len(trace64ids)) |
| 355 if err != nil { |
| 356 return nil, fmt.Errorf("Failed to create short trace ids: %s", e
rr) |
| 357 } |
| 358 |
| 359 add := func(tx *bolt.Tx) error { |
| 360 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME)) |
| 361 |
| 362 // Write the commitinfo. |
| 363 key, err := CommitIDToBytes(in.Commitid) |
| 364 if err != nil { |
| 365 return err |
| 366 } |
| 367 |
| 368 // First load the existing info. |
| 369 data, err := newCommitInfo(c.Get(key)) |
| 370 if err != nil { |
| 371 return fmt.Errorf("Unable to decode stored values: %s",
err) |
| 372 } |
| 373 |
| 374 // Add our new data points. |
| 375 for key, entry := range in.Entries { |
| 376 data.Values[trace64ids[key]] = entry |
| 377 } |
| 378 |
| 379 // Write to the datastore. |
| 380 if err := c.Put(key, data.ToBytes()); err != nil { |
| 381 return fmt.Errorf("Failed to write the trace info for %s
: %s", key, err) |
| 382 } |
| 383 return nil |
| 384 } |
| 385 |
| 386 if err := ts.db.Update(add); err != nil { |
| 387 return nil, fmt.Errorf("Failed to add values to tracedb: %s", er
r) |
| 388 } |
| 389 return &EmptyResponse{}, nil |
| 390 } |
| 391 |
| 392 func (ts *TraceServiceImpl) Remove(ctx context.Context, in *RemoveRequest) (*Emp
tyResponse, error) { |
| 393 if in == nil { |
| 394 return nil, fmt.Errorf("Received nil request.") |
| 395 } |
| 396 remove := func(tx *bolt.Tx) error { |
| 397 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME)) |
| 398 key, err := CommitIDToBytes(in.Commitid) |
| 399 if err != nil { |
| 400 return err |
| 401 } |
| 402 return c.Delete(key) |
| 403 } |
| 404 if err := ts.db.Update(remove); err != nil { |
| 405 return nil, fmt.Errorf("Failed to remove values from tracedb: %s
", err) |
| 406 } |
| 407 ret := &EmptyResponse{} |
| 408 return ret, nil |
| 409 } |
| 410 |
| 411 func (ts *TraceServiceImpl) List(ctx context.Context, listRequest *ListRequest)
(*ListResponse, error) { |
| 412 if listRequest == nil { |
| 413 return nil, fmt.Errorf("Received nil request.") |
| 414 } |
| 415 // Convert the begin and end timestamps into RFC3339 strings we can use
for prefix matching. |
| 416 begin := []byte(time.Unix(listRequest.Begin, 0).Format(time.RFC3339)) |
| 417 end := []byte(time.Unix(listRequest.End, 0).Format(time.RFC3339)) |
| 418 |
| 419 commitIDs := []*CommitID{} |
| 420 |
| 421 // Do a prefix scan and record all the CommitIDs that match. |
| 422 scan := func(tx *bolt.Tx) error { |
| 423 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME)).Cursor() |
| 424 for k, _ := c.Seek(begin); k != nil && bytes.Compare(k, end) <=
0; k, _ = c.Next() { |
| 425 cid, err := CommitIDFromBytes(k) |
| 426 if err != nil { |
| 427 return fmt.Errorf("scan: Failed to deserialize a
commit id: %s", err) |
| 428 } |
| 429 commitIDs = append(commitIDs, cid) |
| 430 } |
| 431 return nil |
| 432 } |
| 433 |
| 434 if err := ts.db.View(scan); err != nil { |
| 435 return nil, fmt.Errorf("Failed to scan for commits: %s", err) |
| 436 } |
| 437 |
| 438 ret := &ListResponse{ |
| 439 Commitids: commitIDs, |
| 440 } |
| 441 return ret, nil |
| 442 } |
| 443 |
| 444 func (ts *TraceServiceImpl) GetValues(ctx context.Context, getValuesRequest *Get
ValuesRequest) (*GetValuesResponse, error) { |
| 445 if getValuesRequest == nil { |
| 446 return nil, fmt.Errorf("Received nil request.") |
| 447 } |
| 448 |
| 449 ret := &GetValuesResponse{ |
| 450 Values: map[string][]byte{}, |
| 451 } |
| 452 |
| 453 // Load the values from the datastore. |
| 454 load := func(tx *bolt.Tx) error { |
| 455 c := tx.Bucket([]byte(COMMIT_BUCKET_NAME)) |
| 456 tid := tx.Bucket([]byte(TRACEID_BUCKET_NAME)) |
| 457 |
| 458 key, err := CommitIDToBytes(getValuesRequest.Commitid) |
| 459 if err != nil { |
| 460 return err |
| 461 } |
| 462 // Load the raw data and convert it into a commitInfo. |
| 463 data, err := newCommitInfo(c.Get(key)) |
| 464 if err != nil { |
| 465 return fmt.Errorf("Unable to decode stored values: %s",
err) |
| 466 } |
| 467 // Pull data out of commitInfo and put into the GetValuesRespons
e. |
| 468 for id64, value := range data.Values { |
| 469 // Look up the traceid from the trace64id, first from th
e in-memory |
| 470 // cache, and then from within the BoltDB if not found. |
| 471 ts.mutex.Lock() |
| 472 cached, ok := ts.cache.Get(id64) |
| 473 ts.mutex.Unlock() |
| 474 if ok { |
| 475 ret.Values[cached.(string)] = value |
| 476 } else { |
| 477 if b := tid.Get(bytesFromUint64(id64)); b == nil
{ |
| 478 return fmt.Errorf("Failed to get traceid
for trace64id %d", id64) |
| 479 } else { |
| 480 ret.Values[string(b)] = value |
| 481 } |
| 482 } |
| 483 } |
| 484 return nil |
| 485 } |
| 486 if err := ts.db.View(load); err != nil { |
| 487 return nil, fmt.Errorf("Failed to load data for commitid: %#v, %
s", *(getValuesRequest.Commitid), err) |
| 488 } |
| 489 |
| 490 return ret, nil |
| 491 } |
| 492 |
| 493 func (ts *TraceServiceImpl) GetParams(ctx context.Context, getParamsRequest *Get
ParamsRequest) (*GetParamsResponse, error) { |
| 494 if getParamsRequest == nil { |
| 495 return nil, fmt.Errorf("Received nil request.") |
| 496 } |
| 497 |
| 498 ret := &GetParamsResponse{ |
| 499 Params: map[string]*Params{}, |
| 500 } |
| 501 load := func(tx *bolt.Tx) error { |
| 502 t := tx.Bucket([]byte(TRACE_BUCKET_NAME)) |
| 503 for _, traceid := range getParamsRequest.Traceids { |
| 504 entry := &StoredEntry{} |
| 505 if err := proto.Unmarshal(t.Get([]byte(traceid)), entry)
; err != nil { |
| 506 return fmt.Errorf("Failed to unmarshal StoredEnt
ry proto for %s: %s", traceid, err) |
| 507 } |
| 508 ret.Params[traceid] = entry.Params |
| 509 } |
| 510 |
| 511 return nil |
| 512 } |
| 513 if err := ts.db.View(load); err != nil { |
| 514 return nil, fmt.Errorf("GetParams: Failed to load data: %s", err
) |
| 515 } |
| 516 |
| 517 return ret, nil |
| 518 } |
| 519 |
| 520 // Close closes the underlying datastore and it not part of the TraceServiceServ
er interface. |
| 521 func (ts *TraceServiceImpl) Close() error { |
| 522 return ts.db.Close() |
| 523 } |
| OLD | NEW |