Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: logdog/common/storage/bigtable/storage.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
260 "startIndex": startIndex, 260 "startIndex": startIndex,
261 }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex, rk.index) 261 }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex, rk.index)
262 262
263 for index := startIndex; index <= rk.index; index++ { 263 for index := startIndex; index <= rk.index; index++ {
264 // If we're not doing keys-only, consume the row. 264 // If we're not doing keys-only, consume the row.
265 var row []byte 265 var row []byte
266 if !r.KeysOnly { 266 if !r.KeysOnly {
267 row, records = records[0], records[1:] 267 row, records = records[0], records[1:]
268 } 268 }
269 269
270 » » » if !cb(types.MessageIndex(index), row) { 270 » » » if !cb(storage.MakeEntry(row, types.MessageIndex(index)) ) {
271 return errStop 271 return errStop
272 } 272 }
273 r.Index = types.MessageIndex(index + 1) 273 r.Index = types.MessageIndex(index + 1)
274 274
275 // Artificially apply limit within our row records. 275 // Artificially apply limit within our row records.
276 if limit > 0 { 276 if limit > 0 {
277 limit-- 277 limit--
278 if limit == 0 { 278 if limit == 0 {
279 return errStop 279 return errStop
280 } 280 }
281 } 281 }
282 } 282 }
283 return nil 283 return nil
284 }) 284 })
285 285
286 switch err { 286 switch err {
287 case nil, errStop: 287 case nil, errStop:
288 return nil 288 return nil
289 289
290 default: 290 default:
291 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") 291 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
292 return err 292 return err
293 } 293 }
294 } 294 }
295 295
296 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]b yte, types.MessageIndex, error) { 296 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st orage.Entry, error) {
297 ctx := log.SetFields(s, log.Fields{ 297 ctx := log.SetFields(s, log.Fields{
298 "project": project, 298 "project": project,
299 "path": path, 299 "path": path,
300 }) 300 })
301 301
302 // Iterate through all log keys in the stream. Record the latest one. 302 // Iterate through all log keys in the stream. Record the latest one.
303 rk := newRowKey(string(project), string(path), 0, 0) 303 rk := newRowKey(string(project), string(path), 0, 0)
304 var latest *rowKey 304 var latest *rowKey
305 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error { 305 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
306 latest = rk 306 latest = rk
307 return nil 307 return nil
308 }) 308 })
309 if err != nil { 309 if err != nil {
310 log.Fields{ 310 log.Fields{
311 log.ErrorKey: err, 311 log.ErrorKey: err,
312 "project": s.Project, 312 "project": s.Project,
313 "instance": s.Instance, 313 "instance": s.Instance,
314 "table": s.LogTable, 314 "table": s.LogTable,
315 }.Errorf(ctx, "Failed to scan for tail.") 315 }.Errorf(ctx, "Failed to scan for tail.")
316 } 316 }
317 317
318 if latest == nil { 318 if latest == nil {
319 // No rows for the specified stream. 319 // No rows for the specified stream.
320 » » return nil, 0, storage.ErrDoesNotExist 320 » » return nil, storage.ErrDoesNotExist
321 } 321 }
322 322
323 // Fetch the latest row's data. 323 // Fetch the latest row's data.
324 var d []byte 324 var d []byte
325 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error { 325 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error {
326 records, err := recordio.Split(data) 326 records, err := recordio.Split(data)
327 if err != nil || len(records) == 0 { 327 if err != nil || len(records) == 0 {
328 return storage.ErrBadData 328 return storage.ErrBadData
329 } 329 }
330 d = records[len(records)-1] 330 d = records[len(records)-1]
331 return errStop 331 return errStop
332 }) 332 })
333 if err != nil && err != errStop { 333 if err != nil && err != errStop {
334 log.Fields{ 334 log.Fields{
335 log.ErrorKey: err, 335 log.ErrorKey: err,
336 "project": s.Project, 336 "project": s.Project,
337 "instance": s.Instance, 337 "instance": s.Instance,
338 "table": s.LogTable, 338 "table": s.LogTable,
339 }.Errorf(ctx, "Failed to retrieve tail row.") 339 }.Errorf(ctx, "Failed to retrieve tail row.")
340 } 340 }
341 341
342 » return d, types.MessageIndex(latest.index), nil 342 » return storage.MakeEntry(d, types.MessageIndex(latest.index)), nil
343 } 343 }
344 344
345 // rowWriter facilitates writing several consecutive data values to a single 345 // rowWriter facilitates writing several consecutive data values to a single
346 // BigTable row. 346 // BigTable row.
347 type rowWriter struct { 347 type rowWriter struct {
348 // buf is the current set of buffered data. 348 // buf is the current set of buffered data.
349 buf bytes.Buffer 349 buf bytes.Buffer
350 350
351 // count is the number of rows in the writer. 351 // count is the number of rows in the writer.
352 count int 352 count int
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
401 }.Debugf(ctx, "Adding entries to BigTable.") 401 }.Debugf(ctx, "Adding entries to BigTable.")
402 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { 402 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
403 return 0, err 403 return 0, err
404 } 404 }
405 405
406 // Reset our buffer state. 406 // Reset our buffer state.
407 w.buf.Reset() 407 w.buf.Reset()
408 w.count = 0 408 w.count = 0
409 return flushCount, nil 409 return flushCount, nil
410 } 410 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698