OLD | NEW |
1 // Package tracedb provides a datastore for efficiently storing and retrieving t
races. | 1 // Package tracedb provides a datastore for efficiently storing and retrieving t
races. |
2 package db | 2 package db |
3 | 3 |
4 import ( | 4 import ( |
5 "fmt" | 5 "fmt" |
6 "sync" | 6 "sync" |
7 "time" | 7 "time" |
8 | 8 |
9 "github.com/golang/groupcache/lru" | 9 "github.com/golang/groupcache/lru" |
10 "github.com/skia-dev/glog" | 10 "github.com/skia-dev/glog" |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
121 return ret, nil | 121 return ret, nil |
122 } | 122 } |
123 | 123 |
124 func (ts *TsDB) ping() error { | 124 func (ts *TsDB) ping() error { |
125 _, err := ts.traceService.Ping(ts.ctx, &traceservice.Empty{}) | 125 _, err := ts.traceService.Ping(ts.ctx, &traceservice.Empty{}) |
126 return err | 126 return err |
127 } | 127 } |
128 | 128 |
129 // addChunk adds a set of entries to the datastore at the given CommitID. | 129 // addChunk adds a set of entries to the datastore at the given CommitID. |
130 func (ts *TsDB) addChunk(ctx context.Context, cid *traceservice.CommitID, chunk
map[string]*Entry) error { | 130 func (ts *TsDB) addChunk(ctx context.Context, cid *traceservice.CommitID, chunk
map[string]*Entry) error { |
| 131 if len(chunk) == 0 { |
| 132 return nil |
| 133 } |
131 addReq := &traceservice.AddRequest{ | 134 addReq := &traceservice.AddRequest{ |
132 Commitid: cid, | 135 Commitid: cid, |
133 Values: []*traceservice.ValuePair{}, | 136 Values: []*traceservice.ValuePair{}, |
134 } | 137 } |
135 addParamsRequest := &traceservice.AddParamsRequest{ | 138 addParamsRequest := &traceservice.AddParamsRequest{ |
136 Params: []*traceservice.ParamsPair{}, | 139 Params: []*traceservice.ParamsPair{}, |
137 } | 140 } |
138 for traceid, entry := range chunk { | 141 for traceid, entry := range chunk { |
139 // Check that all the traceids have their Params. | 142 // Check that all the traceids have their Params. |
140 if _, ok := ts.cache.Get(traceid); !ok { | 143 if _, ok := ts.cache.Get(traceid); !ok { |
141 addParamsRequest.Params = append(addParamsRequest.Params
, &traceservice.ParamsPair{ | 144 addParamsRequest.Params = append(addParamsRequest.Params
, &traceservice.ParamsPair{ |
142 Key: traceid, | 145 Key: traceid, |
143 Params: entry.Params, | 146 Params: entry.Params, |
144 }) | 147 }) |
145 ts.cache.Add(traceid, true) | 148 ts.cache.Add(traceid, true) |
146 } | 149 } |
147 addReq.Values = append(addReq.Values, &traceservice.ValuePair{ | 150 addReq.Values = append(addReq.Values, &traceservice.ValuePair{ |
148 Key: traceid, | 151 Key: traceid, |
149 Value: entry.Value, | 152 Value: entry.Value, |
150 }) | 153 }) |
151 } | 154 } |
152 if len(addParamsRequest.Params) > 0 { | 155 if len(addParamsRequest.Params) > 0 { |
153 » » _, err := ts.traceService.AddParams(ctx, addParamsRequest) | 156 » » if _, err := ts.traceService.AddParams(ctx, addParamsRequest); e
rr != nil { |
154 » » if err != nil { | |
155 return fmt.Errorf("Failed to add params: %s", err) | 157 return fmt.Errorf("Failed to add params: %s", err) |
156 } | 158 } |
157 } | 159 } |
158 if _, err := ts.traceService.Add(ctx, addReq); err != nil { | 160 if _, err := ts.traceService.Add(ctx, addReq); err != nil { |
159 return fmt.Errorf("Failed to add values: %s", err) | 161 return fmt.Errorf("Failed to add values: %s", err) |
160 } | 162 } |
161 return nil | 163 return nil |
162 } | 164 } |
163 | 165 |
164 // tsCommitID converts a db.CommitID to traceservice.CommitID. | 166 // tsCommitID converts a db.CommitID to traceservice.CommitID. |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
268 // Load the values for the commit. | 270 // Load the values for the commit. |
269 getValuesRequest := &traceservice.GetValuesRequest{ | 271 getValuesRequest := &traceservice.GetValuesRequest{ |
270 Commitid: tsCommitID(cid), | 272 Commitid: tsCommitID(cid), |
271 } | 273 } |
272 getValuesResponse, err := ts.traceService.GetValues(ctx,
getValuesRequest) | 274 getValuesResponse, err := ts.traceService.GetValues(ctx,
getValuesRequest) |
273 if err != nil { | 275 if err != nil { |
274 errCh <- fmt.Errorf("Failed to get values for %d
%#v: %s", i, *cid, err) | 276 errCh <- fmt.Errorf("Failed to get values for %d
%#v: %s", i, *cid, err) |
275 return | 277 return |
276 } | 278 } |
277 for _, pair := range getValuesResponse.Values { | 279 for _, pair := range getValuesResponse.Values { |
| 280 if pair == nil { |
| 281 glog.Errorf("Got a nil ValuePair in resp
onse: %s", err) |
| 282 continue |
| 283 } |
278 tr, ok := tile.Traces[pair.Key] | 284 tr, ok := tile.Traces[pair.Key] |
279 if !ok { | 285 if !ok { |
280 tileMutex.Lock() | 286 tileMutex.Lock() |
281 tile.Traces[pair.Key] = ts.traceBuilder(
n) | 287 tile.Traces[pair.Key] = ts.traceBuilder(
n) |
282 tileMutex.Unlock() | 288 tileMutex.Unlock() |
283 tr = tile.Traces[pair.Key] | 289 tr = tile.Traces[pair.Key] |
284 } | 290 } |
285 if err := tr.SetAt(i, pair.Value); err != nil { | 291 if err := tr.SetAt(i, pair.Value); err != nil { |
286 errCh <- fmt.Errorf("Unable to convert t
race value %d %#v: %s", i, *cid, err) | 292 errCh <- fmt.Errorf("Unable to convert t
race value %d %#v: %s", i, *cid, err) |
287 return | 293 return |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
370 return nil, fmt.Errorf("Did not get address for trace services."
) | 376 return nil, fmt.Errorf("Did not get address for trace services."
) |
371 } | 377 } |
372 | 378 |
373 conn, err := grpc.Dial(traceServiceAddr, grpc.WithInsecure()) | 379 conn, err := grpc.Dial(traceServiceAddr, grpc.WithInsecure()) |
374 if err != nil { | 380 if err != nil { |
375 return nil, fmt.Errorf("Unable to connnect to trace service at %
s. Got error: %s", traceServiceAddr, err) | 381 return nil, fmt.Errorf("Unable to connnect to trace service at %
s. Got error: %s", traceServiceAddr, err) |
376 } | 382 } |
377 | 383 |
378 return NewTraceServiceDB(conn, traceBuilder) | 384 return NewTraceServiceDB(conn, traceBuilder) |
379 } | 385 } |
OLD | NEW |