| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 260 "startIndex": startIndex, | 260 "startIndex": startIndex, |
| 261 }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex,
rk.index) | 261 }.Debugf(ctx, "Punting row key range [%d - %d]...", startIndex,
rk.index) |
| 262 | 262 |
| 263 for index := startIndex; index <= rk.index; index++ { | 263 for index := startIndex; index <= rk.index; index++ { |
| 264 // If we're not doing keys-only, consume the row. | 264 // If we're not doing keys-only, consume the row. |
| 265 var row []byte | 265 var row []byte |
| 266 if !r.KeysOnly { | 266 if !r.KeysOnly { |
| 267 row, records = records[0], records[1:] | 267 row, records = records[0], records[1:] |
| 268 } | 268 } |
| 269 | 269 |
| 270 » » » if !cb(types.MessageIndex(index), row) { | 270 » » » if !cb(storage.MakeEntry(row, types.MessageIndex(index))
) { |
| 271 return errStop | 271 return errStop |
| 272 } | 272 } |
| 273 r.Index = types.MessageIndex(index + 1) | 273 r.Index = types.MessageIndex(index + 1) |
| 274 | 274 |
| 275 // Artificially apply limit within our row records. | 275 // Artificially apply limit within our row records. |
| 276 if limit > 0 { | 276 if limit > 0 { |
| 277 limit-- | 277 limit-- |
| 278 if limit == 0 { | 278 if limit == 0 { |
| 279 return errStop | 279 return errStop |
| 280 } | 280 } |
| 281 } | 281 } |
| 282 } | 282 } |
| 283 return nil | 283 return nil |
| 284 }) | 284 }) |
| 285 | 285 |
| 286 switch err { | 286 switch err { |
| 287 case nil, errStop: | 287 case nil, errStop: |
| 288 return nil | 288 return nil |
| 289 | 289 |
| 290 default: | 290 default: |
| 291 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") | 291 log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| 292 return err | 292 return err |
| 293 } | 293 } |
| 294 } | 294 } |
| 295 | 295 |
| 296 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) ([]b
yte, types.MessageIndex, error) { | 296 func (s *btStorage) Tail(project config.ProjectName, path types.StreamPath) (*st
orage.Entry, error) { |
| 297 ctx := log.SetFields(s, log.Fields{ | 297 ctx := log.SetFields(s, log.Fields{ |
| 298 "project": project, | 298 "project": project, |
| 299 "path": path, | 299 "path": path, |
| 300 }) | 300 }) |
| 301 | 301 |
| 302 // Iterate through all log keys in the stream. Record the latest one. | 302 // Iterate through all log keys in the stream. Record the latest one. |
| 303 rk := newRowKey(string(project), string(path), 0, 0) | 303 rk := newRowKey(string(project), string(path), 0, 0) |
| 304 var latest *rowKey | 304 var latest *rowKey |
| 305 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { | 305 err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
| 306 latest = rk | 306 latest = rk |
| 307 return nil | 307 return nil |
| 308 }) | 308 }) |
| 309 if err != nil { | 309 if err != nil { |
| 310 log.Fields{ | 310 log.Fields{ |
| 311 log.ErrorKey: err, | 311 log.ErrorKey: err, |
| 312 "project": s.Project, | 312 "project": s.Project, |
| 313 "instance": s.Instance, | 313 "instance": s.Instance, |
| 314 "table": s.LogTable, | 314 "table": s.LogTable, |
| 315 }.Errorf(ctx, "Failed to scan for tail.") | 315 }.Errorf(ctx, "Failed to scan for tail.") |
| 316 } | 316 } |
| 317 | 317 |
| 318 if latest == nil { | 318 if latest == nil { |
| 319 // No rows for the specified stream. | 319 // No rows for the specified stream. |
| 320 » » return nil, 0, storage.ErrDoesNotExist | 320 » » return nil, storage.ErrDoesNotExist |
| 321 } | 321 } |
| 322 | 322 |
| 323 // Fetch the latest row's data. | 323 // Fetch the latest row's data. |
| 324 var d []byte | 324 var d []byte |
| 325 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { | 325 err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { |
| 326 records, err := recordio.Split(data) | 326 records, err := recordio.Split(data) |
| 327 if err != nil || len(records) == 0 { | 327 if err != nil || len(records) == 0 { |
| 328 return storage.ErrBadData | 328 return storage.ErrBadData |
| 329 } | 329 } |
| 330 d = records[len(records)-1] | 330 d = records[len(records)-1] |
| 331 return errStop | 331 return errStop |
| 332 }) | 332 }) |
| 333 if err != nil && err != errStop { | 333 if err != nil && err != errStop { |
| 334 log.Fields{ | 334 log.Fields{ |
| 335 log.ErrorKey: err, | 335 log.ErrorKey: err, |
| 336 "project": s.Project, | 336 "project": s.Project, |
| 337 "instance": s.Instance, | 337 "instance": s.Instance, |
| 338 "table": s.LogTable, | 338 "table": s.LogTable, |
| 339 }.Errorf(ctx, "Failed to retrieve tail row.") | 339 }.Errorf(ctx, "Failed to retrieve tail row.") |
| 340 } | 340 } |
| 341 | 341 |
| 342 » return d, types.MessageIndex(latest.index), nil | 342 » return storage.MakeEntry(d, types.MessageIndex(latest.index)), nil |
| 343 } | 343 } |
| 344 | 344 |
| 345 // rowWriter facilitates writing several consecutive data values to a single | 345 // rowWriter facilitates writing several consecutive data values to a single |
| 346 // BigTable row. | 346 // BigTable row. |
| 347 type rowWriter struct { | 347 type rowWriter struct { |
| 348 // buf is the current set of buffered data. | 348 // buf is the current set of buffered data. |
| 349 buf bytes.Buffer | 349 buf bytes.Buffer |
| 350 | 350 |
| 351 // count is the number of rows in the writer. | 351 // count is the number of rows in the writer. |
| 352 count int | 352 count int |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 401 }.Debugf(ctx, "Adding entries to BigTable.") | 401 }.Debugf(ctx, "Adding entries to BigTable.") |
| 402 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | 402 if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 403 return 0, err | 403 return 0, err |
| 404 } | 404 } |
| 405 | 405 |
| 406 // Reset our buffer state. | 406 // Reset our buffer state. |
| 407 w.buf.Reset() | 407 w.buf.Reset() |
| 408 w.count = 0 | 408 w.count = 0 |
| 409 return flushCount, nil | 409 return flushCount, nil |
| 410 } | 410 } |
| OLD | NEW |