Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/sha256" | 8 "crypto/sha256" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "errors" | 10 "errors" |
| 11 "fmt" | 11 "fmt" |
| 12 "strings" | 12 "strings" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/golang/protobuf/proto" | 15 "github.com/golang/protobuf/proto" |
| 16 ds "github.com/luci/gae/service/datastore" | 16 ds "github.com/luci/gae/service/datastore" |
| 17 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 18 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 19 ) | 19 ) |
| 20 | 20 |
| 21 // currentLogStreamSchema is the current schema version of the LogStream. | |
| 22 // Changes that are not backward-compatible should update this field so | |
| 23 // migration logic and scripts can translate appropriately. | |
| 24 const currentLogStreamSchema = "1" | |
| 25 | |
| 21 // LogStreamState is the archival state of the log stream. | 26 // LogStreamState is the archival state of the log stream. |
| 22 type LogStreamState int | 27 type LogStreamState int |
| 23 | 28 |
| 24 const ( | 29 const ( |
| 25 » // LSPending indicates that no archival has occurred yet. | 30 » // LSStreaming indicates that the log stream is still streaming. This im plies |
| 26 » LSPending LogStreamState = iota | 31 » // that no terminal index has been identified yet. |
| 27 » // LSTerminated indicates that the log stream has received a terminal in dex | 32 » LSStreaming LogStreamState = iota |
| 28 » // and is awaiting archival. | 33 » // LSArchiveTasked indicates that the log stream has had an archival tas k |
| 29 » LSTerminated | 34 » // generated for it and is awaiting archival. |
| 30 » // LSArchived indicates that the log stream has been successfully archiv ed but | 35 » LSArchiveTasked |
| 31 » // has not yet been cleaned up. | 36 » // LSArchived indicates that the log stream has been successfully archiv ed. |
| 32 LSArchived | 37 LSArchived |
| 33 ) | 38 ) |
| 34 | 39 |
| 40 func (s LogStreamState) String() string { | |
| 41 switch s { | |
| 42 case LSStreaming: | |
| 43 return "STREAMING" | |
| 44 case LSArchiveTasked: | |
| 45 return "ARCHIVE_TASKED" | |
| 46 case LSArchived: | |
| 47 return "ARCHIVED" | |
| 48 default: | |
| 49 return fmt.Sprintf("UNKNOWN(%d)", s) | |
| 50 } | |
| 51 } | |
| 52 | |
| 53 // Archived returns true if this LogStreamState represents a finished archival. | |
| 54 func (s LogStreamState) Archived() bool { | |
| 55 return s >= LSArchived | |
| 56 } | |
| 57 | |
| 35 // LogStream is the primary datastore model containing information and state of | 58 // LogStream is the primary datastore model containing information and state of |
| 36 // an individual log stream. | 59 // an individual log stream. |
| 37 // | 60 // |
| 38 // This structure contains the standard queryable fields, and is the source of | 61 // This structure contains the standard queryable fields, and is the source of |
| 39 // truth for log stream state. Writes to LogStream should be done via Put, which | 62 // truth for log stream state. Writes to LogStream should be done via Put, which |
| 40 // will ensure that the LogStream's related query objects are kept in sync. | 63 // will ensure that the LogStream's related query objects are kept in sync. |
| 41 // | 64 // |
| 42 // This structure has additional datastore fields imposed by the | 65 // This structure has additional datastore fields imposed by the |
| 43 // PropertyLoadSaver. These fields enable querying against some of the complex | 66 // PropertyLoadSaver. These fields enable querying against some of the complex |
| 44 // data types: | 67 // data types: |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 56 // | 79 // |
| 57 // - _Tags is a string slice containing: | 80 // - _Tags is a string slice containing: |
| 58 // - KEY=[VALUE] key/value tags. | 81 // - KEY=[VALUE] key/value tags. |
| 59 // - KEY key presence tags. | 82 // - KEY key presence tags. |
| 60 // | 83 // |
| 61 // - _Terminated is true if the LogStream has been terminated. | 84 // - _Terminated is true if the LogStream has been terminated. |
| 62 // - _Archived is true if the LogStream has been archived. | 85 // - _Archived is true if the LogStream has been archived. |
| 63 // | 86 // |
| 64 // Most of the values in QueryBase are static. Those that change can only be | 87 // Most of the values in QueryBase are static. Those that change can only be |
| 65 // changed through service endpoint methods. | 88 // changed through service endpoint methods. |
| 66 // | |
| 67 // LogStream's QueryBase is authortative. | |
| 68 type LogStream struct { | 89 type LogStream struct { |
| 90 // Schema is the datastore schema version for this object. This can be u sed | |
| 91 // to facilitate schema migrations. | |
| 92 // | |
| 93 // The current schema is currentLogStreamSchema. | |
| 94 Schema string | |
| 95 | |
| 69 // Prefix is this log stream's prefix value. Log streams with the same p refix | 96 // Prefix is this log stream's prefix value. Log streams with the same p refix |
| 70 // are logically grouped. | 97 // are logically grouped. |
| 71 Prefix string | 98 Prefix string |
| 72 // Name is the unique name of this log stream within the Prefix scope. | 99 // Name is the unique name of this log stream within the Prefix scope. |
| 73 Name string | 100 Name string |
| 74 | 101 |
| 75 // State is the log stream's current state. | 102 // State is the log stream's current state. |
| 76 State LogStreamState | 103 State LogStreamState |
| 77 | 104 |
| 78 // Purged, if true, indicates that this log stream has been marked as pu rged. | 105 // Purged, if true, indicates that this log stream has been marked as pu rged. |
| 79 // Non-administrative queries and requests for this stream will operate as | 106 // Non-administrative queries and requests for this stream will operate as |
| 80 // if this entry doesn't exist. | 107 // if this entry doesn't exist. |
| 81 Purged bool | 108 Purged bool |
| 82 | 109 |
| 83 // Secret is the Butler secret value for this stream. | 110 // Secret is the Butler secret value for this stream. |
| 84 // | 111 // |
| 85 // This value may only be returned to LogDog services; it is not user-vi sible. | 112 // This value may only be returned to LogDog services; it is not user-vi sible. |
| 86 Secret []byte `gae:",noindex"` | 113 Secret []byte `gae:",noindex"` |
| 87 | 114 |
| 88 // Created is the time when this stream was created. | 115 // Created is the time when this stream was created. |
| 89 Created time.Time | 116 Created time.Time |
| 90 » // Updated is the Coordinator's record of when this log stream was last | 117 » // TerminatedTime is the Coordinator's record of when this log stream wa s |
| 91 » // updated. | 118 » // terminated. |
| 92 » Updated time.Time | 119 » TerminatedTime time.Time `gae:",noindex"` |
| 120 » // ArchivedTime is the Coordinator's record of when this log stream was | |
| 121 » // archived. | |
| 122 » ArchivedTime time.Time `gae:",noindex"` | |
| 93 | 123 |
| 94 // ProtoVersion is the version string of the protobuf, as reported by th e | 124 // ProtoVersion is the version string of the protobuf, as reported by th e |
| 95 // Collector (and ultimately self-identified by the Butler). | 125 // Collector (and ultimately self-identified by the Butler). |
| 96 ProtoVersion string | 126 ProtoVersion string |
| 97 // Descriptor is the binary protobuf data LogStreamDescriptor. | 127 // Descriptor is the binary protobuf data LogStreamDescriptor. |
| 98 Descriptor []byte `gae:",noindex"` | 128 Descriptor []byte `gae:",noindex"` |
| 99 // ContentType is the MIME-style content type string for this stream. | 129 // ContentType is the MIME-style content type string for this stream. |
| 100 ContentType string | 130 ContentType string |
| 101 // StreamType is the data type of the stream. | 131 // StreamType is the data type of the stream. |
| 102 StreamType logpb.StreamType | 132 StreamType logpb.StreamType |
| 103 // Timestamp is the Descriptor's recorded client-side timestamp. | 133 // Timestamp is the Descriptor's recorded client-side timestamp. |
| 104 Timestamp time.Time | 134 Timestamp time.Time |
| 105 | 135 |
| 106 // Tags is a set of arbitrary key/value tags associated with this stream . Tags | 136 // Tags is a set of arbitrary key/value tags associated with this stream . Tags |
| 107 // can be queried against. | 137 // can be queried against. |
| 108 // | 138 // |
| 109 // The serialization/deserialization is handled manually in order to ena ble | 139 // The serialization/deserialization is handled manually in order to ena ble |
| 110 // key/value queries. | 140 // key/value queries. |
| 111 Tags TagMap `gae:"-"` | 141 Tags TagMap `gae:"-"` |
| 112 | 142 |
| 113 // Source is the set of source strings sent by the Butler. | 143 // Source is the set of source strings sent by the Butler. |
| 114 Source []string | 144 Source []string |
| 115 | 145 |
| 116 » // TerminalIndex is the log stream index of the last log entry in the st ream. | 146 » // TerminalIndex is the index of the last log entry in the stream. |
| 117 » // If the value is -1, the log is still streaming. | 147 » // |
| 148 » // If this is <0, the log stream is either still streaming or has been | |
| 149 » // archived with no log entries. | |
| 118 TerminalIndex int64 `gae:",noindex"` | 150 TerminalIndex int64 `gae:",noindex"` |
| 119 | 151 |
| 152 // ArchiveLogEntryCount is the number of LogEntry records that were arch ived | |
| 153 // for this log stream. | |
| 154 // | |
| 155 // This is valid only if the log stream is Archived. | |
| 156 ArchiveLogEntryCount int64 `gae:",noindex"` | |
| 157 // ArchiveTaskName is the name of this LogStream's archival task. | |
| 158 ArchiveTaskName string `gae:",noindex"` | |
| 159 | |
| 120 // ArchiveIndexURL is the Google Storage URL where the log stream's inde x is | 160 // ArchiveIndexURL is the Google Storage URL where the log stream's inde x is |
| 121 // archived. | 161 // archived. |
| 122 ArchiveIndexURL string `gae:",noindex"` | 162 ArchiveIndexURL string `gae:",noindex"` |
| 123 // ArchiveIndexSize is the size, in bytes, of the archived Index. It wil l be | 163 // ArchiveIndexSize is the size, in bytes, of the archived Index. It wil l be |
| 124 // zero if the file is not archived. | 164 // zero if the file is not archived. |
| 125 ArchiveIndexSize int64 `gae:",noindex"` | 165 ArchiveIndexSize int64 `gae:",noindex"` |
| 126 // ArchiveStreamURL is the Google Storage URL where the log stream's raw | 166 // ArchiveStreamURL is the Google Storage URL where the log stream's raw |
| 127 // stream data is archived. If this is not empty, the log stream is cons idered | 167 // stream data is archived. If this is not empty, the log stream is cons idered |
| 128 // archived. | 168 // archived. |
| 129 ArchiveStreamURL string `gae:",noindex"` | 169 ArchiveStreamURL string `gae:",noindex"` |
| 130 // ArchiveStreamSize is the size, in bytes, of the archived stream. It w ill be | 170 // ArchiveStreamSize is the size, in bytes, of the archived stream. It w ill be |
| 131 // zero if the file is not archived. | 171 // zero if the file is not archived. |
| 132 ArchiveStreamSize int64 `gae:",noindex"` | 172 ArchiveStreamSize int64 `gae:",noindex"` |
| 133 // ArchiveDataURL is the Google Storage URL where the log stream's assem bled | 173 // ArchiveDataURL is the Google Storage URL where the log stream's assem bled |
| 134 // data is archived. If this is not empty, the log stream is considered | 174 // data is archived. If this is not empty, the log stream is considered |
| 135 // archived. | 175 // archived. |
| 136 ArchiveDataURL string `gae:",noindex"` | 176 ArchiveDataURL string `gae:",noindex"` |
| 137 // ArchiveDataSize is the size, in bytes, of the archived data. It will be | 177 // ArchiveDataSize is the size, in bytes, of the archived data. It will be |
| 138 // zero if the file is not archived. | 178 // zero if the file is not archived. |
| 139 ArchiveDataSize int64 `gae:",noindex"` | 179 ArchiveDataSize int64 `gae:",noindex"` |
| 140 » // ArchiveWhole is true if archival is complete and the archived log str eam | 180 » // ArchiveErrors is the number of errors encountered during archival. Th is |
| 141 » // was not missing any entries. | 181 » // will be incremented when incomplete archival is permitted, but the ar chival |
| 142 » ArchiveWhole bool | 182 » // still failed due to an error (e.g., data corruption). If this exceeds a |
| 183 » // threshold, the stream may be marked archived even if the archival fai led. | |
| 184 » ArchiveErrors int | |
| 143 | 185 |
| 144 » // _ causes datastore to ignore unrecognized fields and strip them in fu ture | 186 » // extra causes datastore to ignore unrecognized fields and strip them i n |
| 145 » // writes. | 187 » // future writes. |
| 146 » _ ds.PropertyMap `gae:"-,extra"` | 188 » extra ds.PropertyMap `gae:"-,extra"` |
| 147 | 189 |
| 148 // hashID is the cached generated ID from the stream's Prefix/Name field s. If | 190 // hashID is the cached generated ID from the stream's Prefix/Name field s. If |
| 149 // this is populated, ID metadata will be retrieved from this field inst ead of | 191 // this is populated, ID metadata will be retrieved from this field inst ead of |
| 150 // generated. | 192 // generated. |
| 151 hashID string | 193 hashID string |
| 194 | |
| 195 // noDatastoreValidate is a testing parameter to instruct the LogStream not to | |
|
Vadim Sh.
2016/04/07 01:21:32
noDSValidate
dnj
2016/04/11 17:20:03
Done.
| |
| 196 // validate before reading/writing to datastore. It can be controlled by | |
| 197 // calling SetDSValidate(). | |
| 198 noDSValidate bool | |
| 152 } | 199 } |
| 153 | 200 |
| 154 var _ interface { | 201 var _ interface { |
| 155 ds.PropertyLoadSaver | 202 ds.PropertyLoadSaver |
| 156 ds.MetaGetterSetter | 203 ds.MetaGetterSetter |
| 157 } = (*LogStream)(nil) | 204 } = (*LogStream)(nil) |
| 158 | 205 |
| 159 // NewLogStream returns a LogStream instance with its ID field initialized based | 206 // NewLogStream returns a LogStream instance with its ID field initialized based |
| 160 // on the supplied path. | 207 // on the supplied path. |
| 161 // | 208 // |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 214 | 261 |
| 215 switch k { | 262 switch k { |
| 216 case "_Tags": | 263 case "_Tags": |
| 217 // Load the tag map. Ignore errors. | 264 // Load the tag map. Ignore errors. |
| 218 tm, _ := tagMapFromProperties(v) | 265 tm, _ := tagMapFromProperties(v) |
| 219 s.Tags = tm | 266 s.Tags = tm |
| 220 } | 267 } |
| 221 delete(pmap, k) | 268 delete(pmap, k) |
| 222 } | 269 } |
| 223 | 270 |
| 224 » return ds.GetPLS(s).Load(pmap) | 271 » if err := ds.GetPLS(s).Load(pmap); err != nil { |
| 272 » » return err | |
| 273 » } | |
| 274 | |
| 275 » // Migrate schema (if needed), then validate. | |
| 276 » if err := s.migrateSchema(); err != nil { | |
| 277 » » return err | |
| 278 » } | |
| 279 | |
| 280 » if !s.noDSValidate { | |
| 281 » » if err := s.Validate(); err != nil { | |
| 282 » » » return err | |
| 283 » » } | |
| 284 » } | |
| 285 » return nil | |
| 225 } | 286 } |
| 226 | 287 |
| 227 // Save implements ds.PropertyLoadSaver. | 288 // Save implements ds.PropertyLoadSaver. |
| 228 func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { | 289 func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) { |
| 290 if !s.noDSValidate { | |
| 291 if err := s.Validate(); err != nil { | |
| 292 return nil, err | |
| 293 } | |
| 294 s.Schema = currentLogStreamSchema | |
| 295 } | |
| 296 | |
| 297 // Save default struct fields. | |
| 229 pmap, err := ds.GetPLS(s).Save(withMeta) | 298 pmap, err := ds.GetPLS(s).Save(withMeta) |
| 230 if err != nil { | 299 if err != nil { |
| 231 return nil, err | 300 return nil, err |
| 232 } | 301 } |
| 233 | 302 |
| 234 // Encode _Tags. | 303 // Encode _Tags. |
| 235 pmap["_Tags"], err = s.Tags.toProperties() | 304 pmap["_Tags"], err = s.Tags.toProperties() |
| 236 if err != nil { | 305 if err != nil { |
| 237 return nil, fmt.Errorf("failed to encode tags: %v", err) | 306 return nil, fmt.Errorf("failed to encode tags: %v", err) |
| 238 } | 307 } |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 277 if s.Prefix == "" || s.Name == "" { | 346 if s.Prefix == "" || s.Name == "" { |
| 278 panic("cannot generate ID hash: Prefix and Name are not populated") | 347 panic("cannot generate ID hash: Prefix and Name are not populated") |
| 279 } | 348 } |
| 280 | 349 |
| 281 hash := sha256.Sum256([]byte(s.Path())) | 350 hash := sha256.Sum256([]byte(s.Path())) |
| 282 s.hashID = hex.EncodeToString(hash[:]) | 351 s.hashID = hex.EncodeToString(hash[:]) |
| 283 } | 352 } |
| 284 return s.hashID | 353 return s.hashID |
| 285 } | 354 } |
| 286 | 355 |
| 287 // Put writes this LogStream to the Datastore. Before writing, it validates that | |
| 288 // LogStream is complete. | |
| 289 func (s *LogStream) Put(di ds.Interface) error { | |
| 290 if err := s.Validate(); err != nil { | |
| 291 return err | |
| 292 } | |
| 293 return di.Put(s) | |
| 294 } | |
| 295 | |
| 296 // Validate evaluates the state and data contents of the LogStream and returns | 356 // Validate evaluates the state and data contents of the LogStream and returns |
| 297 // an error if it is invalid. | 357 // an error if it is invalid. |
| 298 func (s *LogStream) Validate() error { | 358 func (s *LogStream) Validate() error { |
| 299 if err := types.StreamName(s.Prefix).Validate(); err != nil { | 359 if err := types.StreamName(s.Prefix).Validate(); err != nil { |
| 300 return fmt.Errorf("invalid prefix: %s", err) | 360 return fmt.Errorf("invalid prefix: %s", err) |
| 301 } | 361 } |
| 302 if err := types.StreamName(s.Name).Validate(); err != nil { | 362 if err := types.StreamName(s.Name).Validate(); err != nil { |
| 303 return fmt.Errorf("invalid name: %s", err) | 363 return fmt.Errorf("invalid name: %s", err) |
| 304 } | 364 } |
| 305 if len(s.Secret) != types.StreamSecretLength { | 365 if len(s.Secret) != types.StreamSecretLength { |
| 306 return fmt.Errorf("invalid secret length (%d != %d)", len(s.Secr et), types.StreamSecretLength) | 366 return fmt.Errorf("invalid secret length (%d != %d)", len(s.Secr et), types.StreamSecretLength) |
| 307 } | 367 } |
| 308 if s.ContentType == "" { | 368 if s.ContentType == "" { |
| 309 return errors.New("empty content type") | 369 return errors.New("empty content type") |
| 310 } | 370 } |
| 311 if s.Created.IsZero() { | 371 if s.Created.IsZero() { |
| 312 return errors.New("created time is not set") | 372 return errors.New("created time is not set") |
| 313 } | 373 } |
| 314 » if s.Updated.IsZero() { | 374 |
| 315 » » return errors.New("updated time is not set") | 375 » if s.Terminated() && s.TerminatedTime.IsZero() { |
| 376 » » return errors.New("log stream is terminated, but missing termina ted time") | |
| 316 } | 377 } |
| 317 » if s.Updated.Before(s.Created) { | 378 » if s.Archived() && s.ArchivedTime.IsZero() { |
| 318 » » return fmt.Errorf("updated time must be >= created time (%s < %s )", s.Updated, s.Created) | 379 » » return errors.New("log stream is archived, but missing archived time") |
| 319 } | 380 } |
| 320 | 381 |
| 321 switch s.StreamType { | 382 switch s.StreamType { |
| 322 case logpb.StreamType_TEXT, logpb.StreamType_BINARY, logpb.StreamType_DA TAGRAM: | 383 case logpb.StreamType_TEXT, logpb.StreamType_BINARY, logpb.StreamType_DA TAGRAM: |
| 323 break | 384 break |
| 324 | 385 |
| 325 default: | 386 default: |
| 326 return fmt.Errorf("unsupported stream type: %v", s.StreamType) | 387 return fmt.Errorf("unsupported stream type: %v", s.StreamType) |
| 327 } | 388 } |
| 328 | 389 |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 343 func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { | 404 func (s *LogStream) DescriptorValue() (*logpb.LogStreamDescriptor, error) { |
| 344 pb := logpb.LogStreamDescriptor{} | 405 pb := logpb.LogStreamDescriptor{} |
| 345 if err := proto.Unmarshal(s.Descriptor, &pb); err != nil { | 406 if err := proto.Unmarshal(s.Descriptor, &pb); err != nil { |
| 346 return nil, err | 407 return nil, err |
| 347 } | 408 } |
| 348 return &pb, nil | 409 return &pb, nil |
| 349 } | 410 } |
| 350 | 411 |
| 351 // Terminated returns true if this stream has been terminated. | 412 // Terminated returns true if this stream has been terminated. |
| 352 func (s *LogStream) Terminated() bool { | 413 func (s *LogStream) Terminated() bool { |
| 353 » return s.State >= LSTerminated | 414 » if s.Archived() { |
| 415 » » return true | |
| 416 » } | |
| 417 » return s.TerminalIndex >= 0 | |
| 354 } | 418 } |
| 355 | 419 |
| 356 // Archived returns true if this stream has been archived. A stream is archived | 420 // Archived returns true if this stream has been archived. |
| 357 // if it has any of its archival properties set. | |
| 358 func (s *LogStream) Archived() bool { | 421 func (s *LogStream) Archived() bool { |
| 359 » return s.State >= LSArchived | 422 » return s.State.Archived() |
| 360 } | 423 } |
| 361 | 424 |
| 362 // ArchiveMatches tests if the supplied Stream, Index, and Data archival URLs | 425 // ArchiveComplete returns true if this stream has been archived and all of its |
| 363 // match the current values. | 426 // log entries were present. |
| 364 func (s *LogStream) ArchiveMatches(sURL, iURL, dURL string) bool { | 427 func (s *LogStream) ArchiveComplete() bool { |
| 365 » return (s.ArchiveStreamURL == sURL && s.ArchiveIndexURL == iURL && s.Arc hiveDataURL == dURL) | 428 » return (s.Archived() && s.ArchiveLogEntryCount == (s.TerminalIndex+1)) |
| 366 } | 429 } |
| 367 | 430 |
| 368 // LoadDescriptor loads the fields in the log stream descriptor into this | 431 // LoadDescriptor loads the fields in the log stream descriptor into this |
| 369 // LogStream entry. These fields are: | 432 // LogStream entry. These fields are: |
| 370 // - Prefix | 433 // - Prefix |
| 371 // - Name | 434 // - Name |
| 372 // - ContentType | 435 // - ContentType |
| 373 // - StreamType | 436 // - StreamType |
| 374 // - Descriptor | 437 // - Descriptor |
| 375 // - Timestamp | 438 // - Timestamp |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 403 // DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor | 466 // DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor |
| 404 // field. It will return an error if the unmarshalling fails. | 467 // field. It will return an error if the unmarshalling fails. |
| 405 func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { | 468 func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) { |
| 406 desc := logpb.LogStreamDescriptor{} | 469 desc := logpb.LogStreamDescriptor{} |
| 407 if err := proto.Unmarshal(s.Descriptor, &desc); err != nil { | 470 if err := proto.Unmarshal(s.Descriptor, &desc); err != nil { |
| 408 return nil, err | 471 return nil, err |
| 409 } | 472 } |
| 410 return &desc, nil | 473 return &desc, nil |
| 411 } | 474 } |
| 412 | 475 |
| 476 // SetDSValidate controls whether this LogStream is validated prior to being | |
| 477 // read from or written to datastore. | |
| 478 // | |
| 479 // This is a testing parameter, and should NOT be used in production code. | |
| 480 func (s *LogStream) SetDSValidate(v bool) { | |
| 481 s.noDSValidate = !v | |
| 482 } | |
| 483 | |
| 413 // normalizeHash takes a SHA256 hexadecimal string as input. It validates that | 484 // normalizeHash takes a SHA256 hexadecimal string as input. It validates that |
| 414 // it is a valid SHA256 hash and, if so, returns a normalized version that can | 485 // it is a valid SHA256 hash and, if so, returns a normalized version that can |
| 415 // be used as a log stream key. | 486 // be used as a log stream key. |
| 416 func normalizeHash(v string) (string, error) { | 487 func normalizeHash(v string) (string, error) { |
| 417 if decodeSize := hex.DecodedLen(len(v)); decodeSize != sha256.Size { | 488 if decodeSize := hex.DecodedLen(len(v)); decodeSize != sha256.Size { |
| 418 return "", fmt.Errorf("invalid SHA256 hash size (%d != %d)", dec odeSize, sha256.Size) | 489 return "", fmt.Errorf("invalid SHA256 hash size (%d != %d)", dec odeSize, sha256.Size) |
| 419 } | 490 } |
| 420 b, err := hex.DecodeString(v) | 491 b, err := hex.DecodeString(v) |
| 421 if err != nil { | 492 if err != nil { |
| 422 return "", err | 493 return "", err |
| (...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 574 // were created before the supplied time. | 645 // were created before the supplied time. |
| 575 func AddOlderFilter(q *ds.Query, t time.Time) *ds.Query { | 646 func AddOlderFilter(q *ds.Query, t time.Time) *ds.Query { |
| 576 return q.Lt("Created", t.UTC()).Order("-Created") | 647 return q.Lt("Created", t.UTC()).Order("-Created") |
| 577 } | 648 } |
| 578 | 649 |
| 579 // AddNewerFilter adds a filter to queries that restricts them to results that | 650 // AddNewerFilter adds a filter to queries that restricts them to results that |
| 580 // were created after the supplied time. | 651 // were created after the supplied time. |
| 581 func AddNewerFilter(q *ds.Query, t time.Time) *ds.Query { | 652 func AddNewerFilter(q *ds.Query, t time.Time) *ds.Query { |
| 582 return q.Gt("Created", t.UTC()).Order("-Created") | 653 return q.Gt("Created", t.UTC()).Order("-Created") |
| 583 } | 654 } |
| OLD | NEW |