Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(520)

Side by Side Diff: logdog/common/storage/bigtable/storage.go

Issue 2611253005: Fix BigTable multi-entry row cached Tail. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
(...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after
219 err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *row Key, data []byte) error { 219 err := s.raw.getLogData(ctx, startKey, r.Limit, r.KeysOnly, func(rk *row Key, data []byte) error {
220 // Does this key match our requested log stream? If not, we've m oved past 220 // Does this key match our requested log stream? If not, we've m oved past
221 // this stream's records and must stop iteration. 221 // this stream's records and must stop iteration.
222 if !rk.sharesPathWith(startKey) { 222 if !rk.sharesPathWith(startKey) {
223 return errStop 223 return errStop
224 } 224 }
225 225
226 // Calculate the start index of the contiguous row. Since we ind ex the row 226 // Calculate the start index of the contiguous row. Since we ind ex the row
227 // on the LAST entry in the row, count backwards to get the inde x of the 227 // on the LAST entry in the row, count backwards to get the inde x of the
228 // first entry. 228 // first entry.
229 » » startIndex := rk.index - rk.count + 1 229 » » startIndex := rk.firstIndex()
230 if startIndex < 0 { 230 if startIndex < 0 {
231 return storage.ErrBadData 231 return storage.ErrBadData
232 } 232 }
233 233
234 // Split our data into records. Leave the records slice nil if w e're doing 234 // Split our data into records. Leave the records slice nil if w e're doing
235 // a keys-only get. 235 // a keys-only get.
236 var records [][]byte 236 var records [][]byte
237 if !r.KeysOnly { 237 if !r.KeysOnly {
238 var err error 238 var err error
239 if records, err = recordio.Split(data); err != nil { 239 if records, err = recordio.Split(data); err != nil {
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
316 316
317 // Iterate through all log keys in the stream. Record the latest contigu ous 317 // Iterate through all log keys in the stream. Record the latest contigu ous
318 // one. 318 // one.
319 var ( 319 var (
320 rk = newRowKey(string(project), string(path), startIdx, 0 ) 320 rk = newRowKey(string(project), string(path), startIdx, 0 )
321 latest *rowKey 321 latest *rowKey
322 nextIndex = startIdx 322 nextIndex = startIdx
323 ) 323 )
324 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { 324 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
325 // If this record is non-contiguous, we're done iterating. 325 // If this record is non-contiguous, we're done iterating.
326 » » if (rk.index - rk.count + 1) != nextIndex { 326 » » if rk.firstIndex() != nextIndex {
327 return errStop 327 return errStop
328 } 328 }
329 329
330 latest, nextIndex = rk, rk.index+1 330 latest, nextIndex = rk, rk.index+1
331 return nil 331 return nil
332 }) 332 })
333 if err != nil && err != errStop { 333 if err != nil && err != errStop {
334 log.Fields{ 334 log.Fields{
335 log.ErrorKey: err, 335 log.ErrorKey: err,
336 "project": s.Project, 336 "project": s.Project,
337 "instance": s.Instance, 337 "instance": s.Instance,
338 "table": s.LogTable, 338 "table": s.LogTable,
339 }.Errorf(ctx, "Failed to scan for tail.") 339 }.Errorf(ctx, "Failed to scan for tail.")
340 } 340 }
341 341
342 if latest == nil { 342 if latest == nil {
343 // No rows for the specified stream. 343 // No rows for the specified stream.
344 return nil, storage.ErrDoesNotExist 344 return nil, storage.ErrDoesNotExist
345 } 345 }
346 346
347 // Update our cache if the tail index has changed. 347 // Update our cache if the tail index has changed.
348 if s.Cache != nil && startIdx != latest.index { 348 if s.Cache != nil && startIdx != latest.index {
349 » » putLastTailIndex(s, s.Cache, project, path, latest.index) 349 » » // We cache the first index in the row so that subsequent cached fetches
350 » » // have the correct "startIdx" expectations.
351 » » putLastTailIndex(s, s.Cache, project, path, latest.firstIndex())
350 } 352 }
351 353
352 // Fetch the latest row's data. 354 // Fetch the latest row's data.
353 var d []byte 355 var d []byte
354 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error { 356 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error {
355 records, err := recordio.Split(data) 357 records, err := recordio.Split(data)
356 if err != nil || len(records) == 0 { 358 if err != nil || len(records) == 0 {
357 return storage.ErrBadData 359 return storage.ErrBadData
358 } 360 }
359 d = records[len(records)-1] 361 d = records[len(records)-1]
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
430 }.Debugf(ctx, "Adding entries to BigTable.") 432 }.Debugf(ctx, "Adding entries to BigTable.")
431 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { 433 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
432 return 0, err 434 return 0, err
433 } 435 }
434 436
435 // Reset our buffer state. 437 // Reset our buffer state.
436 w.buf.Reset() 438 w.buf.Reset()
437 w.count = 0 439 w.count = 0
438 return flushCount, nil 440 return flushCount, nil
439 } 441 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698