| OLD | NEW |
| 1 // Package tracedb provides a datastore for efficiently storing and retrieving t
races. | 1 // Package tracedb provides a datastore for efficiently storing and retrieving t
races. |
| 2 package db | 2 package db |
| 3 | 3 |
| 4 import ( | 4 import ( |
| 5 "fmt" | 5 "fmt" |
| 6 "sync" | 6 "sync" |
| 7 "time" | 7 "time" |
| 8 | 8 |
| 9 "github.com/golang/groupcache/lru" | 9 "github.com/golang/groupcache/lru" |
| 10 "github.com/skia-dev/glog" | 10 "github.com/skia-dev/glog" |
| (...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 121 return ret, nil | 121 return ret, nil |
| 122 } | 122 } |
| 123 | 123 |
| 124 func (ts *TsDB) ping() error { | 124 func (ts *TsDB) ping() error { |
| 125 _, err := ts.traceService.Ping(ts.ctx, &traceservice.Empty{}) | 125 _, err := ts.traceService.Ping(ts.ctx, &traceservice.Empty{}) |
| 126 return err | 126 return err |
| 127 } | 127 } |
| 128 | 128 |
| 129 // addChunk adds a set of entries to the datastore at the given CommitID. | 129 // addChunk adds a set of entries to the datastore at the given CommitID. |
| 130 func (ts *TsDB) addChunk(ctx context.Context, cid *traceservice.CommitID, chunk
map[string]*Entry) error { | 130 func (ts *TsDB) addChunk(ctx context.Context, cid *traceservice.CommitID, chunk
map[string]*Entry) error { |
| 131 if len(chunk) == 0 { |
| 132 return nil |
| 133 } |
| 131 addReq := &traceservice.AddRequest{ | 134 addReq := &traceservice.AddRequest{ |
| 132 Commitid: cid, | 135 Commitid: cid, |
| 133 Values: []*traceservice.ValuePair{}, | 136 Values: []*traceservice.ValuePair{}, |
| 134 } | 137 } |
| 135 addParamsRequest := &traceservice.AddParamsRequest{ | 138 addParamsRequest := &traceservice.AddParamsRequest{ |
| 136 Params: []*traceservice.ParamsPair{}, | 139 Params: []*traceservice.ParamsPair{}, |
| 137 } | 140 } |
| 138 for traceid, entry := range chunk { | 141 for traceid, entry := range chunk { |
| 139 // Check that all the traceids have their Params. | 142 // Check that all the traceids have their Params. |
| 140 if _, ok := ts.cache.Get(traceid); !ok { | 143 if _, ok := ts.cache.Get(traceid); !ok { |
| 141 addParamsRequest.Params = append(addParamsRequest.Params
, &traceservice.ParamsPair{ | 144 addParamsRequest.Params = append(addParamsRequest.Params
, &traceservice.ParamsPair{ |
| 142 Key: traceid, | 145 Key: traceid, |
| 143 Params: entry.Params, | 146 Params: entry.Params, |
| 144 }) | 147 }) |
| 145 ts.cache.Add(traceid, true) | 148 ts.cache.Add(traceid, true) |
| 146 } | 149 } |
| 147 addReq.Values = append(addReq.Values, &traceservice.ValuePair{ | 150 addReq.Values = append(addReq.Values, &traceservice.ValuePair{ |
| 148 Key: traceid, | 151 Key: traceid, |
| 149 Value: entry.Value, | 152 Value: entry.Value, |
| 150 }) | 153 }) |
| 151 } | 154 } |
| 152 if len(addParamsRequest.Params) > 0 { | 155 if len(addParamsRequest.Params) > 0 { |
| 153 » » _, err := ts.traceService.AddParams(ctx, addParamsRequest) | 156 » » if _, err := ts.traceService.AddParams(ctx, addParamsRequest); e
rr != nil { |
| 154 » » if err != nil { | |
| 155 return fmt.Errorf("Failed to add params: %s", err) | 157 return fmt.Errorf("Failed to add params: %s", err) |
| 156 } | 158 } |
| 157 } | 159 } |
| 158 if _, err := ts.traceService.Add(ctx, addReq); err != nil { | 160 if _, err := ts.traceService.Add(ctx, addReq); err != nil { |
| 159 return fmt.Errorf("Failed to add values: %s", err) | 161 return fmt.Errorf("Failed to add values: %s", err) |
| 160 } | 162 } |
| 161 return nil | 163 return nil |
| 162 } | 164 } |
| 163 | 165 |
| 164 // tsCommitID converts a db.CommitID to traceservice.CommitID. | 166 // tsCommitID converts a db.CommitID to traceservice.CommitID. |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 268 // Load the values for the commit. | 270 // Load the values for the commit. |
| 269 getValuesRequest := &traceservice.GetValuesRequest{ | 271 getValuesRequest := &traceservice.GetValuesRequest{ |
| 270 Commitid: tsCommitID(cid), | 272 Commitid: tsCommitID(cid), |
| 271 } | 273 } |
| 272 getValuesResponse, err := ts.traceService.GetValues(ctx,
getValuesRequest) | 274 getValuesResponse, err := ts.traceService.GetValues(ctx,
getValuesRequest) |
| 273 if err != nil { | 275 if err != nil { |
| 274 errCh <- fmt.Errorf("Failed to get values for %d
%#v: %s", i, *cid, err) | 276 errCh <- fmt.Errorf("Failed to get values for %d
%#v: %s", i, *cid, err) |
| 275 return | 277 return |
| 276 } | 278 } |
| 277 for _, pair := range getValuesResponse.Values { | 279 for _, pair := range getValuesResponse.Values { |
| 280 if pair == nil { |
| 281 glog.Errorf("Got a nil ValuePair in resp
onse: %s", err) |
| 282 continue |
| 283 } |
| 278 tr, ok := tile.Traces[pair.Key] | 284 tr, ok := tile.Traces[pair.Key] |
| 279 if !ok { | 285 if !ok { |
| 280 tileMutex.Lock() | 286 tileMutex.Lock() |
| 281 tile.Traces[pair.Key] = ts.traceBuilder(
n) | 287 tile.Traces[pair.Key] = ts.traceBuilder(
n) |
| 282 tileMutex.Unlock() | 288 tileMutex.Unlock() |
| 283 tr = tile.Traces[pair.Key] | 289 tr = tile.Traces[pair.Key] |
| 284 } | 290 } |
| 285 if err := tr.SetAt(i, pair.Value); err != nil { | 291 if err := tr.SetAt(i, pair.Value); err != nil { |
| 286 errCh <- fmt.Errorf("Unable to convert t
race value %d %#v: %s", i, *cid, err) | 292 errCh <- fmt.Errorf("Unable to convert t
race value %d %#v: %s", i, *cid, err) |
| 287 return | 293 return |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 370 return nil, fmt.Errorf("Did not get address for trace services."
) | 376 return nil, fmt.Errorf("Did not get address for trace services."
) |
| 371 } | 377 } |
| 372 | 378 |
| 373 conn, err := grpc.Dial(traceServiceAddr, grpc.WithInsecure()) | 379 conn, err := grpc.Dial(traceServiceAddr, grpc.WithInsecure()) |
| 374 if err != nil { | 380 if err != nil { |
| 375 return nil, fmt.Errorf("Unable to connnect to trace service at %
s. Got error: %s", traceServiceAddr, err) | 381 return nil, fmt.Errorf("Unable to connnect to trace service at %
s. Got error: %s", traceServiceAddr, err) |
| 376 } | 382 } |
| 377 | 383 |
| 378 return NewTraceServiceDB(conn, traceBuilder) | 384 return NewTraceServiceDB(conn, traceBuilder) |
| 379 } | 385 } |
| OLD | NEW |