| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/sha256" | 8 "crypto/sha256" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "errors" | 10 "errors" |
| 11 "fmt" | 11 "fmt" |
| 12 "strings" | 12 "strings" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/golang/protobuf/proto" | 15 "github.com/golang/protobuf/proto" |
| 16 ds "github.com/luci/gae/service/datastore" | 16 ds "github.com/luci/gae/service/datastore" |
| 17 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 18 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 19 ) | 19 ) |
| 20 | 20 |
| 21 // currentLogStreamSchema is the current schema version of the LogStream. |
| 22 // Changes that are not backward-compatible should update this field so |
| 23 // migration logic and scripts can translate appropriately. |
| 24 const currentLogStreamSchema = "1" |
| 25 |
| 21 // LogStreamState is the archival state of the log stream. | 26 // LogStreamState is the archival state of the log stream. |
| 22 type LogStreamState int | 27 type LogStreamState int |
| 23 | 28 |
| 24 const ( | 29 const ( |
| 25 » // LSPending indicates that no archival has occurred yet. | 30 » // LSStreaming indicates that the log stream is still streaming. This im
plies |
| 26 » LSPending LogStreamState = iota | 31 » // that no terminal index has been identified yet. |
| 27 » // LSTerminated indicates that the log stream has received a terminal in
dex | 32 » LSStreaming LogStreamState = iota |
| 28 » // and is awaiting archival. | 33 » // LSArchiveTasked indicates that the log stream has had an archival tas
k |
| 29 » LSTerminated | 34 » // generated for it and is awaiting archival. |
| 30 » // LSArchived indicates that the log stream has been successfully archiv
ed but | 35 » LSArchiveTasked |
| 31 » // has not yet been cleaned up. | 36 » // LSArchived indicates that the log stream has been successfully archiv
ed. |
| 32 LSArchived | 37 LSArchived |
| 33 ) | 38 ) |
| 34 | 39 |
| 40 func (s LogStreamState) String() string { |
| 41 switch s { |
| 42 case LSStreaming: |
| 43 return "STREAMING" |
| 44 case LSArchiveTasked: |
| 45 return "ARCHIVE_TASKED" |
| 46 case LSArchived: |
| 47 return "ARCHIVED" |
| 48 default: |
| 49 return fmt.Sprintf("UNKNOWN(%d)", s) |
| 50 } |
| 51 } |
| 52 |
| 53 // Archived returns true if this LogStreamState represents a finished archival. |
| 54 func (s LogStreamState) Archived() bool { |
| 55 return s >= LSArchived |
| 56 } |
| 57 |
| 35 // LogStream is the primary datastore model containing information and state of | 58 // LogStream is the primary datastore model containing information and state of |
| 36 // an individual log stream. | 59 // an individual log stream. |
| 37 // | 60 // |
| 38 // This structure contains the standard queryable fields, and is the source of | 61 // This structure contains the standard queryable fields, and is the source of |
| 39 // truth for log stream state. Writes to LogStream should be done via Put, which | 62 // truth for log stream state. Writes to LogStream should be done via Put, which |
| 40 // will ensure that the LogStream's related query objects are kept in sync. | 63 // will ensure that the LogStream's related query objects are kept in sync. |
| 41 // | 64 // |
| 42 // This structure has additional datastore fields imposed by the | 65 // This structure has additional datastore fields imposed by the |
| 43 // PropertyLoadSaver. These fields enable querying against some of the complex | 66 // PropertyLoadSaver. These fields enable querying against some of the complex |
| 44 // data types: | 67 // data types: |
| (...skipping 11 matching lines...) Expand all Loading... |
| 56 // | 79 // |
| 57 // - _Tags is a string slice containing: | 80 // - _Tags is a string slice containing: |
| 58 // - KEY=[VALUE] key/value tags. | 81 // - KEY=[VALUE] key/value tags. |
| 59 // - KEY key presence tags. | 82 // - KEY key presence tags. |
| 60 // | 83 // |
| 61 // - _Terminated is true if the LogStream has been terminated. | 84 // - _Terminated is true if the LogStream has been terminated. |
| 62 // - _Archived is true if the LogStream has been archived. | 85 // - _Archived is true if the LogStream has been archived. |
| 63 // | 86 // |
| 64 // Most of the values in QueryBase are static. Those that change can only be | 87 // Most of the values in QueryBase are static. Those that change can only be |
| 65 // changed through service endpoint methods. | 88 // changed through service endpoint methods. |
| 66 // | |
| 67 // LogStream's QueryBase is authortative. | |
| 68 type LogStream struct { | 89 type LogStream struct { |
| 90 // HashID is the LogStream ID. It is generated from the stream's Prefix/
Name |
| 91 // fields. |
| 92 HashID string `gae:"$id"` |
| 93 |
| 94 // Schema is the datastore schema version for this object. This can be u
sed |
| 95 // to facilitate schema migrations. |
| 96 // |
| 97 // The current schema is currentLogStreamSchema. |
| 98 Schema string |
| 99 |
| 69 // Prefix is this log stream's prefix value. Log streams with the same p
refix | 100 // Prefix is this log stream's prefix value. Log streams with the same p
refix |
| 70 // are logically grouped. | 101 // are logically grouped. |
| 102 // |
| 103 // This value should not be changed once populated, as it will invalidat
e the |
| 104 // HashID. |
| 71 Prefix string | 105 Prefix string |
| 72 // Name is the unique name of this log stream within the Prefix scope. | 106 // Name is the unique name of this log stream within the Prefix scope. |
| 107 // |
| 108 // This value should not be changed once populated, as it will invalidat
e the |
| 109 // HashID. |
| 73 Name string | 110 Name string |
| 74 | 111 |
| 75 // State is the log stream's current state. | 112 // State is the log stream's current state. |
| 76 State LogStreamState | 113 State LogStreamState |
| 77 | 114 |
| 78 // Purged, if true, indicates that this log stream has been marked as pu
rged. | 115 // Purged, if true, indicates that this log stream has been marked as pu
rged. |
| 79 // Non-administrative queries and requests for this stream will operate
as | 116 // Non-administrative queries and requests for this stream will operate
as |
| 80 // if this entry doesn't exist. | 117 // if this entry doesn't exist. |
| 81 Purged bool | 118 Purged bool |
| 82 | 119 |
| 83 // Secret is the Butler secret value for this stream. | 120 // Secret is the Butler secret value for this stream. |
| 84 // | 121 // |
| 85 // This value may only be returned to LogDog services; it is not user-vi
sible. | 122 // This value may only be returned to LogDog services; it is not user-vi
sible. |
| 86 Secret []byte `gae:",noindex"` | 123 Secret []byte `gae:",noindex"` |
| 87 | 124 |
| 88 // Created is the time when this stream was created. | 125 // Created is the time when this stream was created. |
| 89 Created time.Time | 126 Created time.Time |
| 90 » // Updated is the Coordinator's record of when this log stream was last | 127 » // TerminatedTime is the Coordinator's record of when this log stream wa
s |
| 91 » // updated. | 128 » // terminated. |
| 92 » Updated time.Time | 129 » TerminatedTime time.Time `gae:",noindex"` |
| 130 » // ArchivedTime is the Coordinator's record of when this log stream was |
| 131 » // archived. |
| 132 » ArchivedTime time.Time `gae:",noindex"` |
| 93 | 133 |
| 94 // ProtoVersion is the version string of the protobuf, as reported by th
e | 134 // ProtoVersion is the version string of the protobuf, as reported by th
e |
| 95 // Collector (and ultimately self-identified by the Butler). | 135 // Collector (and ultimately self-identified by the Butler). |
| 96 ProtoVersion string | 136 ProtoVersion string |
| 97 // Descriptor is the binary protobuf data LogStreamDescriptor. | 137 // Descriptor is the binary protobuf data LogStreamDescriptor. |
| 98 Descriptor []byte `gae:",noindex"` | 138 Descriptor []byte `gae:",noindex"` |
| 99 // ContentType is the MIME-style content type string for this stream. | 139 // ContentType is the MIME-style content type string for this stream. |
| 100 ContentType string | 140 ContentType string |
| 101 // StreamType is the data type of the stream. | 141 // StreamType is the data type of the stream. |
| 102 StreamType logpb.StreamType | 142 StreamType logpb.StreamType |
| 103 // Timestamp is the Descriptor's recorded client-side timestamp. | 143 // Timestamp is the Descriptor's recorded client-side timestamp. |
| 104 Timestamp time.Time | 144 Timestamp time.Time |
| 105 | 145 |
| 106 // Tags is a set of arbitrary key/value tags associated with this stream
. Tags | 146 // Tags is a set of arbitrary key/value tags associated with this stream
. Tags |
| 107 // can be queried against. | 147 // can be queried against. |
| 108 // | 148 // |
| 109 // The serialization/deserialization is handled manually in order to ena
ble | 149 // The serialization/deserialization is handled manually in order to ena
ble |
| 110 // key/value queries. | 150 // key/value queries. |
| 111 Tags TagMap `gae:"-"` | 151 Tags TagMap `gae:"-"` |
| 112 | 152 |
| 113 // Source is the set of source strings sent by the Butler. | 153 // Source is the set of source strings sent by the Butler. |
| 114 Source []string | 154 Source []string |
| 115 | 155 |
| 116 » // TerminalIndex is the log stream index of the last log entry in the st
ream. | 156 » // TerminalIndex is the index of the last log entry in the stream. |
| 117 » // If the value is -1, the log is still streaming. | 157 » // |
| 158 » // If this is <0, the log stream is either still streaming or has been |
| 159 » // archived with no log entries. |
| 118 TerminalIndex int64 `gae:",noindex"` | 160 TerminalIndex int64 `gae:",noindex"` |
| 119 | 161 |
| 162 // ArchiveLogEntryCount is the number of LogEntry records that were arch
ived |
| 163 // for this log stream. |
| 164 // |
| 165 // This is valid only if the log stream is Archived. |
| 166 ArchiveLogEntryCount int64 `gae:",noindex"` |
| 167 // ArchivalKey is the archival key for this log stream. This is used to |
| 168 // differentiate the real archival request from those that were dispatch
ed, |
| 169 // but that ultimately failed to update state. |
| 170 ArchivalKey []byte `gae:",noindex"` |
| 171 |
| 120 // ArchiveIndexURL is the Google Storage URL where the log stream's inde
x is | 172 // ArchiveIndexURL is the Google Storage URL where the log stream's inde
x is |
| 121 // archived. | 173 // archived. |
| 122 ArchiveIndexURL string `gae:",noindex"` | 174 ArchiveIndexURL string `gae:",noindex"` |
| 123 // ArchiveIndexSize is the size, in bytes, of the archived Index. It wil
l be | 175 // ArchiveIndexSize is the size, in bytes, of the archived Index. It wil
l be |
| 124 // zero if the file is not archived. | 176 // zero if the file is not archived. |
| 125 ArchiveIndexSize int64 `gae:",noindex"` | 177 ArchiveIndexSize int64 `gae:",noindex"` |
| 126 // ArchiveStreamURL is the Google Storage URL where the log stream's raw | 178 // ArchiveStreamURL is the Google Storage URL where the log stream's raw |
| 127 // stream data is archived. If this is not empty, the log stream is cons
idered | 179 // stream data is archived. If this is not empty, the log stream is cons
idered |
| 128 // archived. | 180 // archived. |
| 129 ArchiveStreamURL string `gae:",noindex"` | 181 ArchiveStreamURL string `gae:",noindex"` |
| 130 // ArchiveStreamSize is the size, in bytes, of the archived stream. It w
ill be | 182 // ArchiveStreamSize is the size, in bytes, of the archived stream. It w
ill be |
| 131 // zero if the file is not archived. | 183 // zero if the file is not archived. |
| 132 ArchiveStreamSize int64 `gae:",noindex"` | 184 ArchiveStreamSize int64 `gae:",noindex"` |
| 133 // ArchiveDataURL is the Google Storage URL where the log stream's assem
bled | 185 // ArchiveDataURL is the Google Storage URL where the log stream's assem
bled |
| 134 // data is archived. If this is not empty, the log stream is considered | 186 // data is archived. If this is not empty, the log stream is considered |
| 135 // archived. | 187 // archived. |
| 136 ArchiveDataURL string `gae:",noindex"` | 188 ArchiveDataURL string `gae:",noindex"` |
| 137 // ArchiveDataSize is the size, in bytes, of the archived data. It will
be | 189 // ArchiveDataSize is the size, in bytes, of the archived data. It will
be |
| 138 // zero if the file is not archived. | 190 // zero if the file is not archived. |
| 139 ArchiveDataSize int64 `gae:",noindex"` | 191 ArchiveDataSize int64 `gae:",noindex"` |
| 140 // ArchiveWhole is true if archival is complete and the archived log str
eam | |
| 141 // was not missing any entries. | |
| 142 ArchiveWhole bool | |
| 143 | 192 |
| 144 » // _ causes datastore to ignore unrecognized fields and strip them in fu
ture | 193 » // extra causes datastore to ignore unrecognized fields and strip them i
n |
| 145 » // writes. | 194 » // future writes. |
| 146 » _ ds.PropertyMap `gae:"-,extra"` | 195 » extra ds.PropertyMap `gae:"-,extra"` |
| 147 | 196 |
| 148 » // hashID is the cached generated ID from the stream's Prefix/Name field
s. If | 197 » // noDSValidate is a testing parameter to instruct the LogStream not to |
| 149 » // this is populated, ID metadata will be retrieved from this field inst
ead of | 198 » // validate before reading/writing to datastore. It can be controlled by |
| 150 » // generated. | 199 » // calling SetDSValidate(). |
| 151 » hashID string | 200 » noDSValidate bool |
| 152 } | 201 } |
| 153 | 202 |
| 154 var _ interface { | 203 var _ interface { |
| 155 ds.PropertyLoadSaver | 204 ds.PropertyLoadSaver |
| 156 ds.MetaGetterSetter | |
| 157 } = (*LogStream)(nil) | 205 } = (*LogStream)(nil) |
| 158 | 206 |
| 159 // NewLogStream returns a LogStream instance with its ID field initialized based | 207 // NewLogStream returns a LogStream instance with its ID field initialized based |
| 160 // on the supplied path. | 208 // on the supplied path. |
| 161 // | 209 // |
| 162 // The supplied value is a LogDog stream path or a hash of the LogDog stream | 210 // The supplied value is a LogDog stream path or a hash of the LogDog stream |
| 163 // path. | 211 // path. |
| 164 func NewLogStream(value string) (*LogStream, error) { | 212 func NewLogStream(value string) (*LogStream, error) { |
| 165 path := types.StreamPath(value) | 213 path := types.StreamPath(value) |
| 166 if err := path.Validate(); err != nil { | 214 if err := path.Validate(); err != nil { |
| 167 // If it's not a path, see if it's a SHA256 sum. | 215 // If it's not a path, see if it's a SHA256 sum. |
| 168 hash, hashErr := normalizeHash(value) | 216 hash, hashErr := normalizeHash(value) |
| 169 if hashErr != nil { | 217 if hashErr != nil { |
| 170 return nil, fmt.Errorf("invalid path (%s) and hash (%s)"
, err, hashErr) | 218 return nil, fmt.Errorf("invalid path (%s) and hash (%s)"
, err, hashErr) |
| 171 } | 219 } |
| 172 | 220 |
| 173 // Load this LogStream with its SHA256 hash directly. This strea
m will not | 221 // Load this LogStream with its SHA256 hash directly. This strea
m will not |
| 174 // have its Prefix/Name fields populated until it's loaded from
datastore. | 222 // have its Prefix/Name fields populated until it's loaded from
datastore. |
| 175 return LogStreamFromID(hash), nil | 223 return LogStreamFromID(hash), nil |
| 176 } | 224 } |
| 177 | 225 |
| 178 return LogStreamFromPath(path), nil | 226 return LogStreamFromPath(path), nil |
| 179 } | 227 } |
| 180 | 228 |
| 181 // LogStreamFromID returns an empty LogStream instance with a known hash ID. | 229 // LogStreamFromID returns an empty LogStream instance with a known hash ID. |
| 182 func LogStreamFromID(hashID string) *LogStream { | 230 func LogStreamFromID(hashID string) *LogStream { |
| 183 return &LogStream{ | 231 return &LogStream{ |
| 184 » » hashID: hashID, | 232 » » HashID: hashID, |
| 185 } | 233 } |
| 186 } | 234 } |
| 187 | 235 |
| 188 // LogStreamFromPath returns an empty LogStream instance initialized from a | 236 // LogStreamFromPath returns an empty LogStream instance initialized from a |
| 189 // known path value. | 237 // known path value. |
| 190 // | 238 // |
| 191 // The supplied path is assumed to be valid and is not checked. | 239 // The supplied path is assumed to be valid and is not checked. |
| 192 func LogStreamFromPath(path types.StreamPath) *LogStream { | 240 func LogStreamFromPath(path types.StreamPath) *LogStream { |
| 193 // Load the prefix/name fields into the log stream. | 241 // Load the prefix/name fields into the log stream. |
| 194 prefix, name := path.Split() | 242 prefix, name := path.Split() |
| 195 » return &LogStream{ | 243 » ls := LogStream{ |
| 196 Prefix: string(prefix), | 244 Prefix: string(prefix), |
| 197 Name: string(name), | 245 Name: string(name), |
| 198 } | 246 } |
| 247 ls.recalculateHashID() |
| 248 return &ls |
| 199 } | 249 } |
| 200 | 250 |
| 201 // Path returns the LogDog path for this log stream. | 251 // Path returns the LogDog path for this log stream. |
| 202 func (s *LogStream) Path() types.StreamPath { | 252 func (s *LogStream) Path() types.StreamPath { |
| 203 return types.StreamName(s.Prefix).Join(types.StreamName(s.Name)) | 253 return types.StreamName(s.Prefix).Join(types.StreamName(s.Name)) |
| 204 } | 254 } |
| 205 | 255 |
| 206 // Load implements ds.PropertyLoadSaver. | 256 // Load implements ds.PropertyLoadSaver. |
| 207 func (s *LogStream) Load(pmap ds.PropertyMap) error { | 257 func (s *LogStream) Load(pmap ds.PropertyMap) error { |
| 208 // Handle custom properties. Consume them before using the default | 258 // Handle custom properties. Consume them before using the default |
| 209 // PropertyLoadSaver. | 259 // PropertyLoadSaver. |
| 210 for k, v := range pmap { | 260 for k, v := range pmap { |
| 211 if !strings.HasPrefix(k, "_") { | 261 if !strings.HasPrefix(k, "_") { |
| 212 continue | 262 continue |
| 213 } | 263 } |
| 214 | 264 |
| 215 switch k { | 265 switch k { |
| 216 case "_Tags": | 266 case "_Tags": |
| 217 // Load the tag map. Ignore errors. | 267 // Load the tag map. Ignore errors. |
| 218 tm, _ := tagMapFromProperties(v) | 268 tm, _ := tagMapFromProperties(v) |
| 219 s.Tags = tm | 269 s.Tags = tm |
| 220 } | 270 } |
| 221 delete(pmap, k) | 271 delete(pmap, k) |
| 222 } | 272 } |
| 223 | 273 |
| 224 » return ds.GetPLS(s).Load(pmap) | 274 » if err := ds.GetPLS(s).Load(pmap); err != nil { |
| 275 » » return err |
| 276 » } |
| 277 |
| 278 » // Migrate schema (if needed), then validate. |
| 279 » if err := s.migrateSchema(); err != nil { |
| 280 » » return err |
| 281 » } |
| 282 |
| 283 » // Validate the log stream. Don't enforce HashID correctness, since |
| 284 » // datastore hasn't populated that field yet. |
| 285 » if !s.noDSValidate { |
| 286 » » if err := s.validateImpl(false); err != nil { |
| 287 » » » return err |
| 288 » » } |
| 289 » } |
| 290 » return nil |
| 225 } | 291 } |
| 226 | 292 |
| 227 // Save implements ds.PropertyLoadSaver. | 293 // Save implements ds.PropertyLoadSaver. |
| 228 func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { | 294 func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { |
| 295 if !s.noDSValidate { |
| 296 if err := s.validateImpl(true); err != nil { |
| 297 return nil, err |
| 298 } |
| 299 } |
| 300 s.Schema = currentLogStreamSchema |
| 301 |
| 302 // Save default struct fields. |
| 229 pmap, err := ds.GetPLS(s).Save(withMeta) | 303 pmap, err := ds.GetPLS(s).Save(withMeta) |
| 230 if err != nil { | 304 if err != nil { |
| 231 return nil, err | 305 return nil, err |
| 232 } | 306 } |
| 233 | 307 |
| 234 // Encode _Tags. | 308 // Encode _Tags. |
| 235 pmap["_Tags"], err = s.Tags.toProperties() | 309 pmap["_Tags"], err = s.Tags.toProperties() |
| 236 if err != nil { | 310 if err != nil { |
| 237 return nil, fmt.Errorf("failed to encode tags: %v", err) | 311 return nil, fmt.Errorf("failed to encode tags: %v", err) |
| 238 } | 312 } |
| 239 | 313 |
| 240 // Generate our path components, "_C". | 314 // Generate our path components, "_C". |
| 241 pmap["_C"] = generatePathComponents(s.Prefix, s.Name) | 315 pmap["_C"] = generatePathComponents(s.Prefix, s.Name) |
| 242 | 316 |
| 243 // Add our derived statuses. | 317 // Add our derived statuses. |
| 244 pmap["_Terminated"] = []ds.Property{ds.MkProperty(s.Terminated())} | 318 pmap["_Terminated"] = []ds.Property{ds.MkProperty(s.Terminated())} |
| 245 pmap["_Archived"] = []ds.Property{ds.MkProperty(s.Archived())} | 319 pmap["_Archived"] = []ds.Property{ds.MkProperty(s.Archived())} |
| 246 | 320 |
| 247 return pmap, nil | 321 return pmap, nil |
| 248 } | 322 } |
| 249 | 323 |
| 250 // GetMeta implements ds.MetaGetterSetter. | 324 // recalculateHashID calculates the log stream's hash ID from its Prefix/Name |
| 251 func (s *LogStream) GetMeta(key string) (interface{}, bool) { | 325 // fields, which must be populated else this function will panic. |
| 252 » switch key { | 326 // |
| 253 » case "id": | 327 // The value is loaded into its HashID field. |
| 254 » » return s.HashID(), true | 328 func (s *LogStream) recalculateHashID() { |
| 255 | 329 » s.HashID = s.getHashID() |
| 256 » default: | |
| 257 » » return ds.GetPLS(s).GetMeta(key) | |
| 258 » } | |
| 259 } | 330 } |
| 260 | 331 |
| 261 // GetAllMeta implements ds.MetaGetterSetter. | 332 // recalculateHashID calculates the log stream's hash ID from its Prefix/Name |
| 262 func (s *LogStream) GetAllMeta() ds.PropertyMap { | 333 // fields, which must be populated else this function will panic. |
| 263 » pmap := ds.GetPLS(s).GetAllMeta() | 334 func (s *LogStream) getHashID() string { |
| 264 » pmap.SetMeta("id", ds.MkProperty(s.HashID())) | 335 » hash := sha256.Sum256([]byte(s.Path())) |
| 265 » return pmap | 336 » return hex.EncodeToString(hash[:]) |
| 266 } | |
| 267 | |
| 268 // SetMeta implements ds.MetaGetterSetter. | |
| 269 func (s *LogStream) SetMeta(key string, val interface{}) bool { | |
| 270 » return ds.GetPLS(s).SetMeta(key, val) | |
| 271 } | |
| 272 | |
| 273 // HashID generates and populates the hashID field of a LogStream. This | |
| 274 // is the hash of the log stream's full path. | |
| 275 func (s *LogStream) HashID() string { | |
| 276 » if s.hashID == "" { | |
| 277 » » if s.Prefix == "" || s.Name == "" { | |
| 278 » » » panic("cannot generate ID hash: Prefix and Name are not
populated") | |
| 279 » » } | |
| 280 | |
| 281 » » hash := sha256.Sum256([]byte(s.Path())) | |
| 282 » » s.hashID = hex.EncodeToString(hash[:]) | |
| 283 » } | |
| 284 » return s.hashID | |
| 285 } | |
| 286 | |
| 287 // Put writes this LogStream to the Datastore. Before writing, it validates that | |
| 288 // LogStream is complete. | |
| 289 func (s *LogStream) Put(di ds.Interface) error { | |
| 290 » if err := s.Validate(); err != nil { | |
| 291 » » return err | |
| 292 » } | |
| 293 » return di.Put(s) | |
| 294 } | 337 } |
| 295 | 338 |
| 296 // Validate evaluates the state and data contents of the LogStream and returns | 339 // Validate evaluates the state and data contents of the LogStream and returns |
| 297 // an error if it is invalid. | 340 // an error if it is invalid. |
| 298 func (s *LogStream) Validate() error { | 341 func (s *LogStream) Validate() error { |
| 342 return s.validateImpl(true) |
| 343 } |
| 344 |
| 345 func (s *LogStream) validateImpl(enforceHashID bool) error { |
| 346 if enforceHashID { |
| 347 // Make sure our Prefix and Name match the Hash ID. |
| 348 if hid := s.getHashID(); hid != s.HashID { |
| 349 return fmt.Errorf("hash IDs don't match (%q != %q)", hid
, s.HashID) |
| 350 } |
| 351 } |
| 352 |
| 299 if err := types.StreamName(s.Prefix).Validate(); err != nil { | 353 if err := types.StreamName(s.Prefix).Validate(); err != nil { |
| 300 return fmt.Errorf("invalid prefix: %s", err) | 354 return fmt.Errorf("invalid prefix: %s", err) |
| 301 } | 355 } |
| 302 if err := types.StreamName(s.Name).Validate(); err != nil { | 356 if err := types.StreamName(s.Name).Validate(); err != nil { |
| 303 return fmt.Errorf("invalid name: %s", err) | 357 return fmt.Errorf("invalid name: %s", err) |
| 304 } | 358 } |
| 305 if len(s.Secret) != types.StreamSecretLength { | 359 if len(s.Secret) != types.StreamSecretLength { |
| 306 return fmt.Errorf("invalid secret length (%d != %d)", len(s.Secr
et), types.StreamSecretLength) | 360 return fmt.Errorf("invalid secret length (%d != %d)", len(s.Secr
et), types.StreamSecretLength) |
| 307 } | 361 } |
| 308 if s.ContentType == "" { | 362 if s.ContentType == "" { |
| 309 return errors.New("empty content type") | 363 return errors.New("empty content type") |
| 310 } | 364 } |
| 311 if s.Created.IsZero() { | 365 if s.Created.IsZero() { |
| 312 return errors.New("created time is not set") | 366 return errors.New("created time is not set") |
| 313 } | 367 } |
| 314 » if s.Updated.IsZero() { | 368 |
| 315 » » return errors.New("updated time is not set") | 369 » if s.Terminated() && s.TerminatedTime.IsZero() { |
| 370 » » return errors.New("log stream is terminated, but missing termina
ted time") |
| 316 } | 371 } |
| 317 » if s.Updated.Before(s.Created) { | 372 » if s.Archived() && s.ArchivedTime.IsZero() { |
| 318 » » return fmt.Errorf("updated time must be >= created time (%s < %s
)", s.Updated, s.Created) | 373 » » return errors.New("log stream is archived, but missing archived
time") |
| 319 } | 374 } |
| 320 | 375 |
| 321 switch s.StreamType { | 376 switch s.StreamType { |
| 322 case logpb.StreamType_TEXT, logpb.StreamType_BINARY, logpb.StreamType_DA
TAGRAM: | 377 case logpb.StreamType_TEXT, logpb.StreamType_BINARY, logpb.StreamType_DA
TAGRAM: |
| 323 break | 378 break |
| 324 | 379 |
| 325 default: | 380 default: |
| 326 return fmt.Errorf("unsupported stream type: %v", s.StreamType) | 381 return fmt.Errorf("unsupported stream type: %v", s.StreamType) |
| 327 } | 382 } |
| 328 | 383 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 343 func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { | 398 func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { |
| 344 pb := logpb.LogStreamDescriptor{} | 399 pb := logpb.LogStreamDescriptor{} |
| 345 if err := proto.Unmarshal(s.Descriptor, &pb); err != nil { | 400 if err := proto.Unmarshal(s.Descriptor, &pb); err != nil { |
| 346 return nil, err | 401 return nil, err |
| 347 } | 402 } |
| 348 return &pb, nil | 403 return &pb, nil |
| 349 } | 404 } |
| 350 | 405 |
| 351 // Terminated returns true if this stream has been terminated. | 406 // Terminated returns true if this stream has been terminated. |
| 352 func (s *LogStream) Terminated() bool { | 407 func (s *LogStream) Terminated() bool { |
| 353 » return s.State >= LSTerminated | 408 » if s.Archived() { |
| 409 » » return true |
| 410 » } |
| 411 » return s.TerminalIndex >= 0 |
| 354 } | 412 } |
| 355 | 413 |
| 356 // Archived returns true if this stream has been archived. A stream is archived | 414 // Archived returns true if this stream has been archived. |
| 357 // if it has any of its archival properties set. | |
| 358 func (s *LogStream) Archived() bool { | 415 func (s *LogStream) Archived() bool { |
| 359 » return s.State >= LSArchived | 416 » return s.State.Archived() |
| 360 } | 417 } |
| 361 | 418 |
| 362 // ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs | 419 // ArchiveComplete returns true if this stream has been archived and all of its |
| 363 // match the current values. | 420 // log entries were present. |
| 364 func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool { | 421 func (s *LogStream) ArchiveComplete() bool { |
| 365 » return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.Arc
hiveDataURL == dURL) | 422 » return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1)) |
| 366 } | 423 } |
| 367 | 424 |
| 368 // LoadDescriptor loads the fields in the log stream descriptor into this | 425 // LoadDescriptor loads the fields in the log stream descriptor into this |
| 369 // LogStream entry. These fields are: | 426 // LogStream entry. These fields are: |
| 370 // - Prefix | 427 // - Prefix |
| 371 // - Name | 428 // - Name |
| 372 // - ContentType | 429 // - ContentType |
| 373 // - StreamType | 430 // - StreamType |
| 374 // - Descriptor | 431 // - Descriptor |
| 375 // - Timestamp | 432 // - Timestamp |
| 376 // - Tags | 433 // - Tags |
| 377 func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error { | 434 func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error { |
| 435 // If the descriptor's Prefix/Name don't match ours, refuse to load it. |
| 436 if desc.Prefix != s.Prefix { |
| 437 return fmt.Errorf("prefixes don't match (%q != %q)", desc.Prefix
, s.Prefix) |
| 438 } |
| 439 if desc.Name != s.Name { |
| 440 return fmt.Errorf("names don't match (%q != %q)", desc.Name, s.N
ame) |
| 441 } |
| 442 |
| 378 if err := desc.Validate(true); err != nil { | 443 if err := desc.Validate(true); err != nil { |
| 379 return fmt.Errorf("invalid descriptor: %v", err) | 444 return fmt.Errorf("invalid descriptor: %v", err) |
| 380 } | 445 } |
| 381 | 446 |
| 382 pb, err := proto.Marshal(desc) | 447 pb, err := proto.Marshal(desc) |
| 383 if err != nil { | 448 if err != nil { |
| 384 return fmt.Errorf("failed to marshal descriptor: %v", err) | 449 return fmt.Errorf("failed to marshal descriptor: %v", err) |
| 385 } | 450 } |
| 386 | 451 |
| 387 s.Prefix = desc.Prefix | 452 s.Prefix = desc.Prefix |
| (...skipping 15 matching lines...) Expand all Loading... |
| 403 // DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor | 468 // DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor |
| 404 // field. It will return an error if the unmarshalling fails. | 469 // field. It will return an error if the unmarshalling fails. |
| 405 func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { | 470 func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { |
| 406 desc := logpb.LogStreamDescriptor{} | 471 desc := logpb.LogStreamDescriptor{} |
| 407 if err := proto.Unmarshal(s.Descriptor, &desc); err != nil { | 472 if err := proto.Unmarshal(s.Descriptor, &desc); err != nil { |
| 408 return nil, err | 473 return nil, err |
| 409 } | 474 } |
| 410 return &desc, nil | 475 return &desc, nil |
| 411 } | 476 } |
| 412 | 477 |
| 478 // SetDSValidate controls whether this LogStream is validated prior to being |
| 479 // read from or written to datastore. |
| 480 // |
| 481 // This is a testing parameter, and should NOT be used in production code. |
| 482 func (s *LogStream) SetDSValidate(v bool) { |
| 483 s.noDSValidate = !v |
| 484 } |
| 485 |
| 413 // normalizeHash takes a SHA256 hexadecimal string as input. It validates that | 486 // normalizeHash takes a SHA256 hexadecimal string as input. It validates that |
| 414 // it is a valid SHA256 hash and, if so, returns a normalized version that can | 487 // it is a valid SHA256 hash and, if so, returns a normalized version that can |
| 415 // be used as a log stream key. | 488 // be used as a log stream key. |
| 416 func normalizeHash(v string) (string, error) { | 489 func normalizeHash(v string) (string, error) { |
| 417 if decodeSize := hex.DecodedLen(len(v)); decodeSize != sha256.Size { | 490 if decodeSize := hex.DecodedLen(len(v)); decodeSize != sha256.Size { |
| 418 return "", fmt.Errorf("invalid SHA256 hash size (%d != %d)", dec
odeSize, sha256.Size) | 491 return "", fmt.Errorf("invalid SHA256 hash size (%d != %d)", dec
odeSize, sha256.Size) |
| 419 } | 492 } |
| 420 b, err := hex.DecodeString(v) | 493 b, err := hex.DecodeString(v) |
| 421 if err != nil { | 494 if err != nil { |
| 422 return "", err | 495 return "", err |
| (...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 574 // were created before the supplied time. | 647 // were created before the supplied time. |
| 575 func AddOlderFilter(q *ds.Query, t time.Time) *ds.Query { | 648 func AddOlderFilter(q *ds.Query, t time.Time) *ds.Query { |
| 576 return q.Lt("Created", t.UTC()).Order("-Created") | 649 return q.Lt("Created", t.UTC()).Order("-Created") |
| 577 } | 650 } |
| 578 | 651 |
| 579 // AddNewerFilter adds a filter to queries that restricts them to results that | 652 // AddNewerFilter adds a filter to queries that restricts them to results that |
| 580 // were created after the supplied time. | 653 // were created after the supplied time. |
| 581 func AddNewerFilter(q *ds.Query, t time.Time) *ds.Query { | 654 func AddNewerFilter(q *ds.Query, t time.Time) *ds.Query { |
| 582 return q.Gt("Created", t.UTC()).Order("-Created") | 655 return q.Gt("Created", t.UTC()).Order("-Created") |
| 583 } | 656 } |
| OLD | NEW |