Chromium Code Reviews| Index: appengine/logdog/coordinator/logStream.go |
| diff --git a/appengine/logdog/coordinator/logStream.go b/appengine/logdog/coordinator/logStream.go |
| index 4285bd3e4f25dd4708d0e45025d41964cb9ca775..457a28faf27b6729c1afabadfbb62b822bcd9fe7 100644 |
| --- a/appengine/logdog/coordinator/logStream.go |
| +++ b/appengine/logdog/coordinator/logStream.go |
| @@ -18,20 +18,43 @@ import ( |
| "github.com/luci/luci-go/common/proto/logdog/logpb" |
| ) |
| +// currentLogStreamSchema is the current schema version of the LogStream. |
| +// Changes that are not backward-compatible should update this field so |
| +// migration logic and scripts can translate appropriately. |
| +const currentLogStreamSchema = "1" |
| + |
| // LogStreamState is the archival state of the log stream. |
| type LogStreamState int |
| const ( |
| - // LSPending indicates that no archival has occurred yet. |
| - LSPending LogStreamState = iota |
| - // LSTerminated indicates that the log stream has received a terminal index |
| - // and is awaiting archival. |
| - LSTerminated |
| - // LSArchived indicates that the log stream has been successfully archived but |
| - // has not yet been cleaned up. |
| + // LSStreaming indicates that the log stream is still streaming. This implies |
| + // that no terminal index has been identified yet. |
| + LSStreaming LogStreamState = iota |
| + // LSArchiveTasked indicates that the log stream has had an archival task |
| + // generated for it and is awaiting archival. |
| + LSArchiveTasked |
| + // LSArchived indicates that the log stream has been successfully archived. |
| LSArchived |
| ) |
| +func (s LogStreamState) String() string { |
| + switch s { |
| + case LSStreaming: |
| + return "STREAMING" |
| + case LSArchiveTasked: |
| + return "ARCHIVE_TASKED" |
| + case LSArchived: |
| + return "ARCHIVED" |
| + default: |
| + return fmt.Sprintf("UNKNOWN(%d)", s) |
| + } |
| +} |
| + |
| +// Archived returns true if this LogStreamState represents a finished archival. |
| +func (s LogStreamState) Archived() bool { |
| + return s >= LSArchived |
| +} |
| + |
| // LogStream is the primary datastore model containing information and state of |
| // an individual log stream. |
| // |
| @@ -63,9 +86,13 @@ const ( |
| // |
| // Most of the values in QueryBase are static. Those that change can only be |
| // changed through service endpoint methods. |
| -// |
| -// LogStream's QueryBase is authortative. |
| type LogStream struct { |
| + // Schema is the datastore schema version for this object. This can be used |
| + // to facilitate schema migrations. |
| + // |
| + // The current schema is currentLogStreamSchema. |
| + Schema string |
| + |
| // Prefix is this log stream's prefix value. Log streams with the same prefix |
| // are logically grouped. |
| Prefix string |
| @@ -87,9 +114,12 @@ type LogStream struct { |
| // Created is the time when this stream was created. |
| Created time.Time |
| - // Updated is the Coordinator's record of when this log stream was last |
| - // updated. |
| - Updated time.Time |
| + // TerminatedTime is the Coordinator's record of when this log stream was |
| + // terminated. |
| + TerminatedTime time.Time `gae:",noindex"` |
| + // ArchivedTime is the Coordinator's record of when this log stream was |
| + // archived. |
| + ArchivedTime time.Time `gae:",noindex"` |
| // ProtoVersion is the version string of the protobuf, as reported by the |
| // Collector (and ultimately self-identified by the Butler). |
| @@ -113,10 +143,20 @@ type LogStream struct { |
| // Source is the set of source strings sent by the Butler. |
| Source []string |
| - // TerminalIndex is the log stream index of the last log entry in the stream. |
| - // If the value is -1, the log is still streaming. |
| + // TerminalIndex is the index of the last log entry in the stream. |
| + // |
| + // If this is <0, the log stream is either still streaming or has been |
| + // archived with no log entries. |
| TerminalIndex int64 `gae:",noindex"` |
| + // ArchiveLogEntryCount is the number of LogEntry records that were archived |
| + // for this log stream. |
| + // |
| + // This is valid only if the log stream is Archived. |
| + ArchiveLogEntryCount int64 `gae:",noindex"` |
| + // ArchiveTaskName is the name of this LogStream's archival task. |
| + ArchiveTaskName string `gae:",noindex"` |
| + |
| // ArchiveIndexURL is the Google Storage URL where the log stream's index is |
| // archived. |
| ArchiveIndexURL string `gae:",noindex"` |
| @@ -137,18 +177,25 @@ type LogStream struct { |
| // ArchiveDataSize is the size, in bytes, of the archived data. It will be |
| // zero if the file is not archived. |
| ArchiveDataSize int64 `gae:",noindex"` |
| - // ArchiveWhole is true if archival is complete and the archived log stream |
| - // was not missing any entries. |
| - ArchiveWhole bool |
| + // ArchiveErrors is the number of errors encountered during archival. This |
| + // will be incremented when incomplete archival is permitted, but the archival |
| + // still failed due to an error (e.g., data corruption). If this exceeds a |
| + // threshold, the stream may be marked archived even if the archival failed. |
| + ArchiveErrors int |
| - // _ causes datastore to ignore unrecognized fields and strip them in future |
| - // writes. |
| - _ ds.PropertyMap `gae:"-,extra"` |
| + // extra causes datastore to ignore unrecognized fields and strip them in |
| + // future writes. |
| + extra ds.PropertyMap `gae:"-,extra"` |
| // hashID is the cached generated ID from the stream's Prefix/Name fields. If |
| // this is populated, ID metadata will be retrieved from this field instead of |
| // generated. |
| hashID string |
| + |
| + // noDatastoreValidate is a testing parameter to instruct the LogStream not to |
|
Vadim Sh.
2016/04/07 01:21:32
noDSValidate
dnj
2016/04/11 17:20:03
Done.
|
| + // validate before reading/writing to datastore. It can be controlled by |
| + // calling SetDSValidate(). |
| + noDSValidate bool |
| } |
| var _ interface { |
| @@ -221,11 +268,33 @@ func (s *LogStream) Load(pmap ds.PropertyMap) error { |
| delete(pmap, k) |
| } |
| - return ds.GetPLS(s).Load(pmap) |
| + if err := ds.GetPLS(s).Load(pmap); err != nil { |
| + return err |
| + } |
| + |
| + // Migrate schema (if needed), then validate. |
| + if err := s.migrateSchema(); err != nil { |
| + return err |
| + } |
| + |
| + if !s.noDSValidate { |
| + if err := s.Validate(); err != nil { |
| + return err |
| + } |
| + } |
| + return nil |
| } |
| // Save implements ds.PropertyLoadSaver. |
| func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { |
| + if !s.noDSValidate { |
| + if err := s.Validate(); err != nil { |
| + return nil, err |
| + } |
| + s.Schema = currentLogStreamSchema |
| + } |
| + |
| + // Save default struct fields. |
| pmap, err := ds.GetPLS(s).Save(withMeta) |
| if err != nil { |
| return nil, err |
| @@ -284,15 +353,6 @@ func (s *LogStream) HashID() string { |
| return s.hashID |
| } |
| -// Put writes this LogStream to the Datastore. Before writing, it validates that |
| -// LogStream is complete. |
| -func (s *LogStream) Put(di ds.Interface) error { |
| - if err := s.Validate(); err != nil { |
| - return err |
| - } |
| - return di.Put(s) |
| -} |
| - |
| // Validate evaluates the state and data contents of the LogStream and returns |
| // an error if it is invalid. |
| func (s *LogStream) Validate() error { |
| @@ -311,11 +371,12 @@ func (s *LogStream) Validate() error { |
| if s.Created.IsZero() { |
| return errors.New("created time is not set") |
| } |
| - if s.Updated.IsZero() { |
| - return errors.New("updated time is not set") |
| + |
| + if s.Terminated() && s.TerminatedTime.IsZero() { |
| + return errors.New("log stream is terminated, but missing terminated time") |
| } |
| - if s.Updated.Before(s.Created) { |
| - return fmt.Errorf("updated time must be >= created time (%s < %s)", s.Updated, s.Created) |
| + if s.Archived() && s.ArchivedTime.IsZero() { |
| + return errors.New("log stream is archived, but missing archived time") |
| } |
| switch s.StreamType { |
| @@ -350,19 +411,21 @@ func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { |
| // Terminated returns true if this stream has been terminated. |
| func (s *LogStream) Terminated() bool { |
| - return s.State >= LSTerminated |
| + if s.Archived() { |
| + return true |
| + } |
| + return s.TerminalIndex >= 0 |
| } |
| -// Archived returns true if this stream has been archived. A stream is archived |
| -// if it has any of its archival properties set. |
| +// Archived returns true if this stream has been archived. |
| func (s *LogStream) Archived() bool { |
| - return s.State >= LSArchived |
| + return s.State.Archived() |
| } |
| -// ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs |
| -// match the current values. |
| -func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool { |
| - return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.ArchiveDataURL == dURL) |
| +// ArchiveComplete returns true if this stream has been archived and all of its |
| +// log entries were present. |
| +func (s *LogStream) ArchiveComplete() bool { |
| + return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1)) |
| } |
| // LoadDescriptor loads the fields in the log stream descriptor into this |
| @@ -410,6 +473,14 @@ func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { |
| return &desc, nil |
| } |
| +// SetDSValidate controls whether this LogStream is validated prior to being |
| +// read from or written to datastore. |
| +// |
| +// This is a testing parameter, and should NOT be used in production code. |
| +func (s *LogStream) SetDSValidate(v bool) { |
| + s.noDSValidate = !v |
| +} |
| + |
| // normalizeHash takes a SHA256 hexadecimal string as input. It validates that |
| // it is a valid SHA256 hash and, if so, returns a normalized version that can |
| // be used as a log stream key. |