Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(567)

Unified Diff: appengine/logdog/coordinator/logStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/logStream.go
diff --git a/appengine/logdog/coordinator/logStream.go b/appengine/logdog/coordinator/logStream.go
index 4285bd3e4f25dd4708d0e45025d41964cb9ca775..457a28faf27b6729c1afabadfbb62b822bcd9fe7 100644
--- a/appengine/logdog/coordinator/logStream.go
+++ b/appengine/logdog/coordinator/logStream.go
@@ -18,20 +18,43 @@ import (
"github.com/luci/luci-go/common/proto/logdog/logpb"
)
+// currentLogStreamSchema is the current schema version of the LogStream.
+// Changes that are not backward-compatible should update this field so
+// migration logic and scripts can translate appropriately.
+const currentLogStreamSchema = "1"
+
// LogStreamState is the archival state of the log stream.
type LogStreamState int
const (
- // LSPending indicates that no archival has occurred yet.
- LSPending LogStreamState = iota
- // LSTerminated indicates that the log stream has received a terminal index
- // and is awaiting archival.
- LSTerminated
- // LSArchived indicates that the log stream has been successfully archived but
- // has not yet been cleaned up.
+ // LSStreaming indicates that the log stream is still streaming. This implies
+ // that no terminal index has been identified yet.
+ LSStreaming LogStreamState = iota
+ // LSArchiveTasked indicates that the log stream has had an archival task
+ // generated for it and is awaiting archival.
+ LSArchiveTasked
+ // LSArchived indicates that the log stream has been successfully archived.
LSArchived
)
+func (s LogStreamState) String() string {
+ switch s {
+ case LSStreaming:
+ return "STREAMING"
+ case LSArchiveTasked:
+ return "ARCHIVE_TASKED"
+ case LSArchived:
+ return "ARCHIVED"
+ default:
+ return fmt.Sprintf("UNKNOWN(%d)", s)
+ }
+}
+
+// Archived returns true if this LogStreamState represents a finished archival.
+func (s LogStreamState) Archived() bool {
+ return s >= LSArchived
+}
+
// LogStream is the primary datastore model containing information and state of
// an individual log stream.
//
@@ -63,9 +86,13 @@ const (
//
// Most of the values in QueryBase are static. Those that change can only be
// changed through service endpoint methods.
-//
-// LogStream's QueryBase is authortative.
type LogStream struct {
+ // Schema is the datastore schema version for this object. This can be used
+ // to facilitate schema migrations.
+ //
+ // The current schema is currentLogStreamSchema.
+ Schema string
+
// Prefix is this log stream's prefix value. Log streams with the same prefix
// are logically grouped.
Prefix string
@@ -87,9 +114,12 @@ type LogStream struct {
// Created is the time when this stream was created.
Created time.Time
- // Updated is the Coordinator's record of when this log stream was last
- // updated.
- Updated time.Time
+ // TerminatedTime is the Coordinator's record of when this log stream was
+ // terminated.
+ TerminatedTime time.Time `gae:",noindex"`
+ // ArchivedTime is the Coordinator's record of when this log stream was
+ // archived.
+ ArchivedTime time.Time `gae:",noindex"`
// ProtoVersion is the version string of the protobuf, as reported by the
// Collector (and ultimately self-identified by the Butler).
@@ -113,10 +143,20 @@ type LogStream struct {
// Source is the set of source strings sent by the Butler.
Source []string
- // TerminalIndex is the log stream index of the last log entry in the stream.
- // If the value is -1, the log is still streaming.
+ // TerminalIndex is the index of the last log entry in the stream.
+ //
+ // If this is <0, the log stream is either still streaming or has been
+ // archived with no log entries.
TerminalIndex int64 `gae:",noindex"`
+ // ArchiveLogEntryCount is the number of LogEntry records that were archived
+ // for this log stream.
+ //
+ // This is valid only if the log stream is Archived.
+ ArchiveLogEntryCount int64 `gae:",noindex"`
+ // ArchiveTaskName is the name of this LogStream's archival task.
+ ArchiveTaskName string `gae:",noindex"`
+
// ArchiveIndexURL is the Google Storage URL where the log stream's index is
// archived.
ArchiveIndexURL string `gae:",noindex"`
@@ -137,18 +177,25 @@ type LogStream struct {
// ArchiveDataSize is the size, in bytes, of the archived data. It will be
// zero if the file is not archived.
ArchiveDataSize int64 `gae:",noindex"`
- // ArchiveWhole is true if archival is complete and the archived log stream
- // was not missing any entries.
- ArchiveWhole bool
+ // ArchiveErrors is the number of errors encountered during archival. This
+ // will be incremented when incomplete archival is permitted, but the archival
+ // still failed due to an error (e.g., data corruption). If this exceeds a
+ // threshold, the stream may be marked archived even if the archival failed.
+ ArchiveErrors int
- // _ causes datastore to ignore unrecognized fields and strip them in future
- // writes.
- _ ds.PropertyMap `gae:"-,extra"`
+ // extra causes datastore to ignore unrecognized fields and strip them in
+ // future writes.
+ extra ds.PropertyMap `gae:"-,extra"`
// hashID is the cached generated ID from the stream's Prefix/Name fields. If
// this is populated, ID metadata will be retrieved from this field instead of
// generated.
hashID string
+
+ // noDatastoreValidate is a testing parameter to instruct the LogStream not to
Vadim Sh. 2016/04/07 01:21:32 noDSValidate
dnj 2016/04/11 17:20:03 Done.
+ // validate before reading/writing to datastore. It can be controlled by
+ // calling SetDSValidate().
+ noDSValidate bool
}
var _ interface {
@@ -221,11 +268,33 @@ func (s *LogStream) Load(pmap ds.PropertyMap) error {
delete(pmap, k)
}
- return ds.GetPLS(s).Load(pmap)
+ if err := ds.GetPLS(s).Load(pmap); err != nil {
+ return err
+ }
+
+ // Migrate schema (if needed), then validate.
+ if err := s.migrateSchema(); err != nil {
+ return err
+ }
+
+ if !s.noDSValidate {
+ if err := s.Validate(); err != nil {
+ return err
+ }
+ }
+ return nil
}
// Save implements ds.PropertyLoadSaver.
func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
+ if !s.noDSValidate {
+ if err := s.Validate(); err != nil {
+ return nil, err
+ }
+ s.Schema = currentLogStreamSchema
+ }
+
+ // Save default struct fields.
pmap, err := ds.GetPLS(s).Save(withMeta)
if err != nil {
return nil, err
@@ -284,15 +353,6 @@ func (s *LogStream) HashID() string {
return s.hashID
}
-// Put writes this LogStream to the Datastore. Before writing, it validates that
-// LogStream is complete.
-func (s *LogStream) Put(di ds.Interface) error {
- if err := s.Validate(); err != nil {
- return err
- }
- return di.Put(s)
-}
-
// Validate evaluates the state and data contents of the LogStream and returns
// an error if it is invalid.
func (s *LogStream) Validate() error {
@@ -311,11 +371,12 @@ func (s *LogStream) Validate() error {
if s.Created.IsZero() {
return errors.New("created time is not set")
}
- if s.Updated.IsZero() {
- return errors.New("updated time is not set")
+
+ if s.Terminated() && s.TerminatedTime.IsZero() {
+ return errors.New("log stream is terminated, but missing terminated time")
}
- if s.Updated.Before(s.Created) {
- return fmt.Errorf("updated time must be >= created time (%s < %s)", s.Updated, s.Created)
+ if s.Archived() && s.ArchivedTime.IsZero() {
+ return errors.New("log stream is archived, but missing archived time")
}
switch s.StreamType {
@@ -350,19 +411,21 @@ func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) {
// Terminated returns true if this stream has been terminated.
func (s *LogStream) Terminated() bool {
- return s.State >= LSTerminated
+ if s.Archived() {
+ return true
+ }
+ return s.TerminalIndex >= 0
}
-// Archived returns true if this stream has been archived. A stream is archived
-// if it has any of its archival properties set.
+// Archived returns true if this stream has been archived.
func (s *LogStream) Archived() bool {
- return s.State >= LSArchived
+ return s.State.Archived()
}
-// ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs
-// match the current values.
-func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool {
- return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.ArchiveDataURL == dURL)
+// ArchiveComplete returns true if this stream has been archived and all of its
+// log entries were present.
+func (s *LogStream) ArchiveComplete() bool {
+ return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1))
}
// LoadDescriptor loads the fields in the log stream descriptor into this
@@ -410,6 +473,14 @@ func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) {
return &desc, nil
}
+// SetDSValidate controls whether this LogStream is validated prior to being
+// read from or written to datastore.
+//
+// This is a testing parameter, and should NOT be used in production code.
+func (s *LogStream) SetDSValidate(v bool) {
+ s.noDSValidate = !v
+}
+
// normalizeHash takes a SHA256 hexadecimal string as input. It validates that
// it is a valid SHA256 hash and, if so, returns a normalized version that can
// be used as a log stream key.

Powered by Google App Engine
This is Rietveld 408576698