| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 219 err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *row
Key, data []byte) error { | 219 err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *row
Key, data []byte) error { |
| 220 // Does this key match our requested log stream? If not, we've m
oved past | 220 // Does this key match our requested log stream? If not, we've m
oved past |
| 221 // this stream's records and must stop iteration. | 221 // this stream's records and must stop iteration. |
| 222 if !rk.sharesPathWith(startKey) { | 222 if !rk.sharesPathWith(startKey) { |
| 223 return errStop | 223 return errStop |
| 224 } | 224 } |
| 225 | 225 |
| 226 // Calculate the start index of the contiguous row. Since we ind
ex the row | 226 // Calculate the start index of the contiguous row. Since we ind
ex the row |
| 227 // on the LAST entry in the row, count backwards to get the inde
x of the | 227 // on the LAST entry in the row, count backwards to get the inde
x of the |
| 228 // first entry. | 228 // first entry. |
| 229 » » startIndex := rk.index - rk.count + 1 | 229 » » startIndex := rk.firstIndex() |
| 230 if startIndex < 0 { | 230 if startIndex < 0 { |
| 231 return storage.ErrBadData | 231 return storage.ErrBadData |
| 232 } | 232 } |
| 233 | 233 |
| 234 // Split our data into records. Leave the records slice nil if w
e're doing | 234 // Split our data into records. Leave the records slice nil if w
e're doing |
| 235 // a keys-only get. | 235 // a keys-only get. |
| 236 var records [][]byte | 236 var records [][]byte |
| 237 if !r.KeysOnly { | 237 if !r.KeysOnly { |
| 238 var err error | 238 var err error |
| 239 if records, err = recordio.Split(data); err != nil { | 239 if records, err = recordio.Split(data); err != nil { |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 316 | 316 |
| 317 // Iterate through all log keys in the stream. Record the latest contigu
ous | 317 // Iterate through all log keys in the stream. Record the latest contigu
ous |
| 318 // one. | 318 // one. |
| 319 var ( | 319 var ( |
| 320 rk = newRowKey(string(project), string(path), startIdx, 0
) | 320 rk = newRowKey(string(project), string(path), startIdx, 0
) |
| 321 latest *rowKey | 321 latest *rowKey |
| 322 nextIndex = startIdx | 322 nextIndex = startIdx |
| 323 ) | 323 ) |
| 324 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { | 324 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
| 325 // If this record is non-contiguous, we're done iterating. | 325 // If this record is non-contiguous, we're done iterating. |
| 326 » » if (rk.index - rk.count + 1) != nextIndex { | 326 » » if rk.firstIndex() != nextIndex { |
| 327 return errStop | 327 return errStop |
| 328 } | 328 } |
| 329 | 329 |
| 330 latest, nextIndex = rk, rk.index+1 | 330 latest, nextIndex = rk, rk.index+1 |
| 331 return nil | 331 return nil |
| 332 }) | 332 }) |
| 333 if err != nil && err != errStop { | 333 if err != nil && err != errStop { |
| 334 log.Fields{ | 334 log.Fields{ |
| 335 log.ErrorKey: err, | 335 log.ErrorKey: err, |
| 336 "project": s.Project, | 336 "project": s.Project, |
| 337 "instance": s.Instance, | 337 "instance": s.Instance, |
| 338 "table": s.LogTable, | 338 "table": s.LogTable, |
| 339 }.Errorf(ctx, "Failed to scan for tail.") | 339 }.Errorf(ctx, "Failed to scan for tail.") |
| 340 } | 340 } |
| 341 | 341 |
| 342 if latest == nil { | 342 if latest == nil { |
| 343 // No rows for the specified stream. | 343 // No rows for the specified stream. |
| 344 return nil, storage.ErrDoesNotExist | 344 return nil, storage.ErrDoesNotExist |
| 345 } | 345 } |
| 346 | 346 |
| 347 // Update our cache if the tail index has changed. | 347 // Update our cache if the tail index has changed. |
| 348 if s.Cache != nil && startIdx != latest.index { | 348 if s.Cache != nil && startIdx != latest.index { |
| 349 » » putLastTailIndex(s, s.Cache, project, path, latest.index) | 349 » » // We cache the first index in the row so that subsequent cached
fetches |
| 350 » » // have the correct "startIdx" expectations. |
| 351 » » putLastTailIndex(s, s.Cache, project, path, latest.firstIndex()) |
| 350 } | 352 } |
| 351 | 353 |
| 352 // Fetch the latest row's data. | 354 // Fetch the latest row's data. |
| 353 var d []byte | 355 var d []byte |
| 354 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { | 356 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { |
| 355 records, err := recordio.Split(data) | 357 records, err := recordio.Split(data) |
| 356 if err != nil || len(records) == 0 { | 358 if err != nil || len(records) == 0 { |
| 357 return storage.ErrBadData | 359 return storage.ErrBadData |
| 358 } | 360 } |
| 359 d = records[len(records)-1] | 361 d = records[len(records)-1] |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 430 }.Debugf(ctx, "Adding entries to BigTable.") | 432 }.Debugf(ctx, "Adding entries to BigTable.") |
| 431 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 433 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 432 return 0, err | 434 return 0, err |
| 433 } | 435 } |
| 434 | 436 |
| 435 // Reset our buffer state. | 437 // Reset our buffer state. |
| 436 w.buf.Reset() | 438 w.buf.Reset() |
| 437 w.count = 0 | 439 w.count = 0 |
| 438 return flushCount, nil | 440 return flushCount, nil |
| 439 } | 441 } |
| OLD | NEW |