OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package bigtable | 5 package bigtable |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
11 | 11 |
| 12 "github.com/luci/luci-go/common/config" |
12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
13 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
14 "github.com/luci/luci-go/common/recordio" | 15 "github.com/luci/luci-go/common/recordio" |
15 "github.com/luci/luci-go/server/logdog/storage" | 16 "github.com/luci/luci-go/server/logdog/storage" |
16 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
17 "google.golang.org/cloud" | 18 "google.golang.org/cloud" |
18 "google.golang.org/cloud/bigtable" | 19 "google.golang.org/cloud/bigtable" |
19 ) | 20 ) |
20 | 21 |
21 var ( | 22 var ( |
(...skipping 26 matching lines...) Expand all Loading... |
48 ) | 49 ) |
49 | 50 |
50 var ( | 51 var ( |
51 // errStop is an internal sentinel error used to indicate "stop iteratio
n" | 52 // errStop is an internal sentinel error used to indicate "stop iteratio
n" |
52 // to btTable.getLogData iterator. | 53 // to btTable.getLogData iterator. |
53 errStop = errors.New("bigtable: stop iteration") | 54 errStop = errors.New("bigtable: stop iteration") |
54 | 55 |
55 // errRepeatRequest is a transition sentinel that instructs us to repeat | 56 // errRepeatRequest is a transition sentinel that instructs us to repeat |
56 // our query with KeysOnly set to true. | 57 // our query with KeysOnly set to true. |
57 // | 58 // |
58 » // This can be deleted once BigTable rows without a count have been aged | 59 » // TODO(dnj): This can be deleted once BigTable rows without a count hav
e been |
59 » // off. | 60 » // aged off. |
60 errRepeatRequest = errors.New("bigtable: repeat request") | 61 errRepeatRequest = errors.New("bigtable: repeat request") |
61 ) | 62 ) |
62 | 63 |
63 // Options is a set of configuration options for BigTable storage. | 64 // Options is a set of configuration options for BigTable storage. |
64 type Options struct { | 65 type Options struct { |
65 // Project is the name of the project to connect to. | 66 // Project is the name of the project to connect to. |
66 Project string | 67 Project string |
67 // Zone is the name of the zone to connect to. | 68 // Zone is the name of the zone to connect to. |
68 Zone string | 69 Zone string |
69 // Cluster is the name of the cluster to connect to. | 70 // Cluster is the name of the cluster to connect to. |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
164 func (s *btStorage) Put(r storage.PutRequest) error { | 165 func (s *btStorage) Put(r storage.PutRequest) error { |
165 rw := rowWriter{ | 166 rw := rowWriter{ |
166 threshold: s.maxRowSize, | 167 threshold: s.maxRowSize, |
167 } | 168 } |
168 | 169 |
169 for len(r.Values) > 0 { | 170 for len(r.Values) > 0 { |
170 // Add the next entry to the writer. | 171 // Add the next entry to the writer. |
171 if appended := rw.append(r.Values[0]); !appended { | 172 if appended := rw.append(r.Values[0]); !appended { |
172 // We have failed to append our maximum BigTable row siz
e. Flush any | 173 // We have failed to append our maximum BigTable row siz
e. Flush any |
173 // currently-buffered row data and try again with an emp
ty buffer. | 174 // currently-buffered row data and try again with an emp
ty buffer. |
174 » » » count, err := rw.flush(s, s.raw, r.Index, r.Path) | 175 » » » count, err := rw.flush(s, s.raw, r.Index, r.Project, r.P
ath) |
175 if err != nil { | 176 if err != nil { |
176 return err | 177 return err |
177 } | 178 } |
178 | 179 |
179 if count == 0 { | 180 if count == 0 { |
180 // Nothing was buffered, but we still couldn't a
ppend an entry. The | 181 // Nothing was buffered, but we still couldn't a
ppend an entry. The |
181 // current entry is too large by itself, so we m
ust fail. | 182 // current entry is too large by itself, so we m
ust fail. |
182 return fmt.Errorf("single row entry exceeds maxi
mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) | 183 return fmt.Errorf("single row entry exceeds maxi
mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) |
183 } | 184 } |
184 | 185 |
185 r.Index += types.MessageIndex(count) | 186 r.Index += types.MessageIndex(count) |
186 continue | 187 continue |
187 } | 188 } |
188 | 189 |
189 // We successfully appended this entry, so advance. | 190 // We successfully appended this entry, so advance. |
190 r.Values = r.Values[1:] | 191 r.Values = r.Values[1:] |
191 } | 192 } |
192 | 193 |
193 // Flush any buffered rows. | 194 // Flush any buffered rows. |
194 » if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { | 195 » if _, err := rw.flush(s, s.raw, r.Index, r.Project, r.Path); err != nil
{ |
195 return err | 196 return err |
196 } | 197 } |
197 return nil | 198 return nil |
198 } | 199 } |
199 | 200 |
200 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { | 201 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
201 » startKey := newRowKey(string(r.Path), int64(r.Index), 0) | 202 » startKey := newRowKey(string(r.Project), string(r.Path), int64(r.Index),
0) |
202 ctx := log.SetFields(s, log.Fields{ | 203 ctx := log.SetFields(s, log.Fields{ |
| 204 "project": r.Project, |
203 "path": r.Path, | 205 "path": r.Path, |
204 "index": r.Index, | 206 "index": r.Index, |
205 "limit": r.Limit, | 207 "limit": r.Limit, |
206 "startRowKey": startKey, | 208 "startRowKey": startKey, |
207 "keysOnly": r.KeysOnly, | 209 "keysOnly": r.KeysOnly, |
208 }) | 210 }) |
209 | 211 |
210 // If we issue a query and get back a legacy row, it will have no count | 212 // If we issue a query and get back a legacy row, it will have no count |
211 // associated with it. We will fast-exit | 213 // associated with it. We will fast-exit |
212 | 214 |
(...skipping 17 matching lines...) Expand all Loading... |
230 | 232 |
231 switch { | 233 switch { |
232 case r.KeysOnly: | 234 case r.KeysOnly: |
233 // Keys only query, so count is the authority. | 235 // Keys only query, so count is the authority. |
234 if rk.count == 0 { | 236 if rk.count == 0 { |
235 // If it's zero, we are dealing with a legacy ro
w, before we started | 237 // If it's zero, we are dealing with a legacy ro
w, before we started |
236 // keeping count. A keys-only query is insuffici
ent to get the | 238 // keeping count. A keys-only query is insuffici
ent to get the |
237 // inforamtion that we need, so repeat this Get
request with KeysOnly | 239 // inforamtion that we need, so repeat this Get
request with KeysOnly |
238 // set to false. | 240 // set to false. |
239 // | 241 // |
240 » » » » // NOTE: This logic can be removed once all unco
unted rows are aged off. | 242 » » » » // TODO(dnj): This logic can be removed once all
uncounted rows are aged |
| 243 » » » » // off. |
241 r.KeysOnly = false | 244 r.KeysOnly = false |
242 return errRepeatRequest | 245 return errRepeatRequest |
243 } | 246 } |
244 break | 247 break |
245 | 248 |
246 case rk.count == 0: | 249 case rk.count == 0: |
247 // This is a non-keys-only query, but we are dealing wit
h a legacy row. | 250 // This is a non-keys-only query, but we are dealing wit
h a legacy row. |
248 // Use the record count as the authority. | 251 // Use the record count as the authority. |
249 // | 252 // |
250 » » » // NOTE: This logic can be removed once all uncounted ro
ws are aged off. | 253 » » » // TODO(dnj): This logic can be removed once all uncount
ed rows are aged |
| 254 » » » // off. |
251 rk.count = int64(len(records)) | 255 rk.count = int64(len(records)) |
252 | 256 |
253 case rk.count == int64(len(records)): | 257 case rk.count == int64(len(records)): |
254 // This is the expected case, so we're set. | 258 // This is the expected case, so we're set. |
255 // | 259 // |
256 » » » // NOTE: Once uncounted rows are aged off, this is the o
nly logic that | 260 » » » // TODO(dnj): Once uncounted rows are aged off, this is
the only logic |
257 » » » // we need, in the form of a sanity assertion. | 261 » » » // that we need, in the form of a sanity assertion. |
258 break | 262 break |
259 | 263 |
260 default: | 264 default: |
261 log.Fields{ | 265 log.Fields{ |
262 "count": rk.count, | 266 "count": rk.count, |
263 "recordCount": len(records), | 267 "recordCount": len(records), |
264 }.Errorf(ctx, "Record count doesn't match declared count
.") | 268 }.Errorf(ctx, "Record count doesn't match declared count
.") |
265 return storage.ErrBadData | 269 return storage.ErrBadData |
266 } | 270 } |
267 | 271 |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
319 | 323 |
320 case errRepeatRequest: | 324 case errRepeatRequest: |
321 return s.Get(r, cb) | 325 return s.Get(r, cb) |
322 | 326 |
323 default: | 327 default: |
324 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 328 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
325 return err | 329 return err |
326 } | 330 } |
327 } | 331 } |
328 | 332 |
329 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
{ | 333 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]b
yte, types.MessageIndex, error) { |
330 ctx := log.SetFields(s, log.Fields{ | 334 ctx := log.SetFields(s, log.Fields{ |
331 » » "path": p, | 335 » » "project": project, |
| 336 » » "path": path, |
332 }) | 337 }) |
333 | 338 |
334 // Iterate through all log keys in the stream. Record the latest one. | 339 // Iterate through all log keys in the stream. Record the latest one. |
335 » rk := newRowKey(string(p), 0, 0) | 340 » rk := newRowKey(string(project), string(path), 0, 0) |
336 var latest *rowKey | 341 var latest *rowKey |
337 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { | 342 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
338 latest = rk | 343 latest = rk |
339 return nil | 344 return nil |
340 }) | 345 }) |
341 if err != nil { | 346 if err != nil { |
342 log.Fields{ | 347 log.Fields{ |
343 log.ErrorKey: err, | 348 log.ErrorKey: err, |
344 "project": s.Project, | 349 "project": s.Project, |
345 "zone": s.Zone, | 350 "zone": s.Zone, |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
405 } | 410 } |
406 | 411 |
407 // If we have exceeded our threshold, report a failure. | 412 // If we have exceeded our threshold, report a failure. |
408 appended = (w.buf.Len() <= w.threshold) | 413 appended = (w.buf.Len() <= w.threshold) |
409 if appended { | 414 if appended { |
410 w.count++ | 415 w.count++ |
411 } | 416 } |
412 return | 417 return |
413 } | 418 } |
414 | 419 |
415 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, path types.StreamPath) (int, error) { | 420 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, |
| 421 » project config.ProjectName, path types.StreamPath) (int, error) { |
416 flushCount := w.count | 422 flushCount := w.count |
417 if flushCount == 0 { | 423 if flushCount == 0 { |
418 return 0, nil | 424 return 0, nil |
419 } | 425 } |
420 | 426 |
421 // Write the current set of buffered rows to the table. Index on the LAS
T | 427 // Write the current set of buffered rows to the table. Index on the LAS
T |
422 // row index. | 428 // row index. |
423 lastIndex := int64(index) + int64(flushCount) - 1 | 429 lastIndex := int64(index) + int64(flushCount) - 1 |
424 » rk := newRowKey(string(path), lastIndex, int64(w.count)) | 430 » rk := newRowKey(string(project), string(path), lastIndex, int64(w.count)
) |
425 | 431 |
426 log.Fields{ | 432 log.Fields{ |
427 "rowKey": rk, | 433 "rowKey": rk, |
| 434 "project": project, |
428 "path": path, | 435 "path": path, |
429 "index": index, | 436 "index": index, |
430 "lastIndex": lastIndex, | 437 "lastIndex": lastIndex, |
431 "count": w.count, | 438 "count": w.count, |
432 "size": w.buf.Len(), | 439 "size": w.buf.Len(), |
433 }.Debugf(ctx, "Adding entries to BigTable.") | 440 }.Debugf(ctx, "Adding entries to BigTable.") |
434 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 441 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
435 return 0, err | 442 return 0, err |
436 } | 443 } |
437 | 444 |
438 // Reset our buffer state. | 445 // Reset our buffer state. |
439 w.buf.Reset() | 446 w.buf.Reset() |
440 w.count = 0 | 447 w.count = 0 |
441 return flushCount, nil | 448 return flushCount, nil |
442 } | 449 } |
OLD | NEW |